diff --git a/dbm-services/redis/db-tools/dbactuator/example/redis_replicas_force_resync.md b/dbm-services/redis/db-tools/dbactuator/example/redis_replicas_force_resync.md new file mode 100644 index 0000000000..113c71c0d1 --- /dev/null +++ b/dbm-services/redis/db-tools/dbactuator/example/redis_replicas_force_resync.md @@ -0,0 +1,17 @@ +### redis_replicas_force_resync +redis slave 强制重同步. + + +``` +./dbactuator_redis --uid={{uid}} --root_id={{root_id}} --node_id={{node_id}} --version_id={{version_id}} --atom-job-list="redis_replicas_force_resync" --data_dir=/path/to/data --backup_dir=/path/to/backup --payload='{{payload_base64}}' +``` + +`--data_dir`、`--backup_dir` 可以留空. + +原始payload +```json +{ + "slave_ip":"xx.xx.xx.xx", + "slave_ports":[30000,30001] +} +``` \ No newline at end of file diff --git a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go index b59654098e..d2c9b74dd6 100644 --- a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go +++ b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go @@ -1215,6 +1215,36 @@ func (db *RedisClient) ClusterMeet(ip, port string) (ret string, err error) { return } +// ClusterMeetAndUtilFinish TODO +func (db *RedisClient) ClusterMeetAndUtilFinish(ip, port string) (err error) { + // 执行 cluster addslots 命令只能用 普通redis client + if db.InstanceClient == nil { + err = fmt.Errorf("ClusterMeetAndUtilFinish redis:%s must create a standalone client", db.Addr) + mylog.Logger.Error(err.Error()) + return + } + mylog.Logger.Info("redis(%s) 'cluster meet %s %s' start", db.Addr, ip, port) + _, err = db.ClusterMeet(ip, port) + if err != nil { + return + } + targetAddr := fmt.Sprintf("%s:%s", ip, port) + var addrMapToNodes map[string]*ClusterNodeData + for { + addrMapToNodes, err = db.GetAddrMapToNodes() + if err != nil { + return + } + if _, ok := addrMapToNodes[targetAddr]; ok { + mylog.Logger.Info("redis:%s cluster meet %s %s success", db.Addr, ip, port) + break + } + mylog.Logger.Info("redis:%s cluster meet %s %s done,but not in 'cluster nodes'", db.Addr, ip, port) + time.Sleep(3 * time.Second) + } + return nil +} + // ClusterAddSlots 添加slots, 'cluster addslots 'command func (db *RedisClient) ClusterAddSlots(slots []int) (ret string, err error) { // 执行 cluster addslots 命令只能用 普通redis client diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_replicas_force_resync.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_replicas_force_resync.go new file mode 100644 index 0000000000..7f3051eda2 --- /dev/null +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_replicas_force_resync.go @@ -0,0 +1,365 @@ +package atomredis + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/go-playground/validator/v10" + + "dbm-services/redis/db-tools/dbactuator/models/myredis" + "dbm-services/redis/db-tools/dbactuator/pkg/consts" + "dbm-services/redis/db-tools/dbactuator/pkg/jobruntime" + "dbm-services/redis/db-tools/dbactuator/pkg/util" +) + +// ReplicaResyncParams params +type ReplicaResyncParams struct { + SlaveIP string `json:"slave_ip" validate:"required"` + SlavePorts []int `json:"slave_ports" validate:"required"` +} + +// ReplicasForceResync TODO +type ReplicasForceResync struct { + runtime *jobruntime.JobGenericRuntime + params ReplicaResyncParams + slaveAddrMapCli map[string]*myredis.RedisClient + slavePairsMap map[string]ReplicaItem +} + +// 无实际作用,仅确保实现了 jobruntime.JobRunner 接口 +var _ jobruntime.JobRunner = (*ReplicasForceResync)(nil) + +// NewReplicasForceResync new +func NewReplicasForceResync() jobruntime.JobRunner { + return &ReplicasForceResync{} +} + +// Init prepare run env +func (job *ReplicasForceResync) Init(m *jobruntime.JobGenericRuntime) error { + job.runtime = m + err := json.Unmarshal([]byte(job.runtime.PayloadDecoded), &job.params) + if err != nil { + job.runtime.Logger.Error(fmt.Sprintf("json.Unmarshal failed,err:%+v", err)) + return err + } + // 参数有效性检查 + validate := validator.New() + err = validate.Struct(job.params) + if err != nil { + if _, ok := err.(*validator.InvalidValidationError); ok { + job.runtime.Logger.Error("ReplicasForceResync Init params validate failed,err:%v,params:%+v", + err, job.params) + return err + } + for _, err := range err.(validator.ValidationErrors) { + job.runtime.Logger.Error("ReplicasForceResync Init params validate failed,err:%v,params:%+v", + err, job.params) + return err + } + } + return nil +} + +// Name 原子任务名 +func (job *ReplicasForceResync) Name() string { + return "redis_replicas_force_resync" +} + +// getReplicaPairsFile 获取 master slave pairs 文件 +func (job *ReplicasForceResync) getReplicaPairsFile() string { + file := fmt.Sprintf("%s_%s_master_replica_pairs", job.runtime.UID, job.params.SlaveIP) + saveDir := filepath.Join(consts.GetRedisBackupDir(), "dbbak", "replicas_force_resync") + if !util.FileExists(saveDir) { + util.MkDirsIfNotExists([]string{saveDir}) + util.LocalDirChownMysql(saveDir) + } + fullPath := filepath.Join(saveDir, file) + return fullPath +} + +// checkReplicaParisFileExistsAndNotEmpty 检查文件是否存在且不为空 +// 如果文件不存在或为空,则返回 false +// 如果文件存在且不为空,则返回 true +func (job *ReplicasForceResync) checkReplicaParisFileExistsAndNotEmpty() (ok bool, err error) { + fullPath := job.getReplicaPairsFile() + if !util.FileExists(fullPath) { + return false, nil + } + statInfo, err := os.Stat(fullPath) + if err != nil { + err = fmt.Errorf("os.Stat failed,err:%v,fullPath:%s", err, fullPath) + job.runtime.Logger.Error(err.Error()) + return false, err + } + if statInfo.Size() == 0 { + return false, nil + } + return true, nil +} + +// writeSlavePairsToFile 将主从关系写入文件 +func (job *ReplicasForceResync) writeSlavePairsToFile() (err error) { + pairFile := job.getReplicaPairsFile() + pairData, err := json.Marshal(job.slavePairsMap) + if err != nil { + err = fmt.Errorf("json.Marshal failed,err:%v,pairData:%+v", err, job.slavePairsMap) + job.runtime.Logger.Error(err.Error()) + return err + } + err = os.WriteFile(pairFile, pairData, 0644) + if err != nil { + err = fmt.Errorf("os.WriteFile failed,err:%v,pairFile:%s,pairData:%s", err, pairFile, pairData) + job.runtime.Logger.Error(err.Error()) + return err + } + util.LocalDirChownMysql(pairFile) + job.runtime.Logger.Info(fmt.Sprintf("writeSlavePairsToFile to %s ok", pairFile)) + return nil +} + +// getSlavePairsFromFile 从slave pairs文件中读取数据,并填充到job.slavePairsMap中,得到主从关系 +func (job *ReplicasForceResync) getSlavePairsFromFile() (err error) { + pairFile := job.getReplicaPairsFile() + if !util.FileExists(pairFile) { + err = fmt.Errorf("file:%s not exists", pairFile) + job.runtime.Logger.Error(err.Error()) + return err + } + pairData, err := os.ReadFile(pairFile) + if err != nil { + err = fmt.Errorf("os.ReadFile failed,err:%v,pairFile:%s", err, pairFile) + job.runtime.Logger.Error(err.Error()) + return err + } + pairDecoded := make(map[string]ReplicaItem) + err = json.Unmarshal(pairData, &pairDecoded) + if err != nil { + err = fmt.Errorf("json.Unmarshal failed,err:%v,pairData:%s", err, pairData) + job.runtime.Logger.Error(err.Error()) + return err + } + for k, v := range pairDecoded { + if _, ok := job.slavePairsMap[k]; !ok { + job.slavePairsMap[k] = v + } + } + job.runtime.Logger.Info(fmt.Sprintf("completeSlavePairs from %s ok", pairFile)) + return nil +} + +// Run Command Run +func (job *ReplicasForceResync) Run() (err error) { + util.StopBkDbmon() + defer util.StartBkDbmon() + + job.slavePairsMap = make(map[string]ReplicaItem) + err = job.slaveInstsAbleToConnect() + if err != nil { + return err + } + defer job.allInstDisconnect() + + // slaveof no one + err = job.slaveofNoOneAndUtilToMaster() + if err != nil { + return err + } + // slave flushall + err = job.slaveFlushall() + if err != nil { + return err + } + err = job.clusterMeetAndUtilFinish() + if err != nil { + return err + } + // slaveof master_ip master_port + err = job.ReplicaResync() + if err != nil { + return err + } + return nil +} + +// slaveInstsAbleToConnect 检查所有slave可连接 +func (job *ReplicasForceResync) slaveInstsAbleToConnect() (err error) { + instsAddrs := make([]string, 0, len(job.params.SlavePorts)) + job.slaveAddrMapCli = make(map[string]*myredis.RedisClient, len(job.params.SlavePorts)) + var addr, password, role, masterHost, masterPortStr string + pairFileOK, err := job.checkReplicaParisFileExistsAndNotEmpty() + if err != nil { + return err + } + allInstancesSlave := true + for _, port := range job.params.SlavePorts { + addr = fmt.Sprintf("%s:%d", job.params.SlaveIP, port) + instsAddrs = append(instsAddrs, addr) + password, err = myredis.GetRedisPasswdFromConfFile(port) + if err != nil { + return err + } + cli, err := myredis.NewRedisClientWithTimeout(addr, password, 0, + consts.TendisTypeRedisInstance, 5*time.Second) + if err != nil { + return err + } + role, err = cli.GetRole() + if err != nil { + return err + } + if role != consts.RedisSlaveRole { + if !pairFileOK { + // 如果存在非slave角色,同时 不存在slave pairs 文件 + // 代表是第一次运行,这种情况报错 + // 第一次运行,slave关系必须是ok的 + err = fmt.Errorf("redis instance(%s) role:%s,not slave role", addr, role) + job.runtime.Logger.Error(err.Error()) + return err + } + allInstancesSlave = false + } else { + // 获取 master_host and master_port + masterHost, masterPortStr, _, _, err = cli.GetMasterData() + if err != nil { + return err + } + masterPort, _ := strconv.Atoi(masterPortStr) + job.slavePairsMap[addr] = ReplicaItem{ + MasterIP: masterHost, + MasterPort: masterPort, + MasterAuth: password, + SlaveIP: job.params.SlaveIP, + SlavePort: port, + SlavePassword: password, + } + } + job.slaveAddrMapCli[addr] = cli + } + if !allInstancesSlave { + if !pairFileOK { + err = fmt.Errorf("slave instances not all slave role,and %s not exist", job.getReplicaPairsFile()) + job.runtime.Logger.Error(err.Error()) + return err + } + err = job.getSlavePairsFromFile() + if err != nil { + return err + } + } + err = job.writeSlavePairsToFile() + if err != nil { + return err + } + job.runtime.Logger.Info("all slave instances able to connect,(%+v)", instsAddrs) + return nil +} + +// allInstDisconnect 所有实例断开连接 +func (job *ReplicasForceResync) allInstDisconnect() { + for _, cli := range job.slaveAddrMapCli { + cli.Close() + } +} + +// slaveofNoOneAndUtilToMaster slaveof on one and util to master +func (job *ReplicasForceResync) slaveofNoOneAndUtilToMaster() (err error) { + var role, msg string + var isClusterEnabled bool + for _, cli := range job.slaveAddrMapCli { + isClusterEnabled, err = cli.IsClusterEnabled() + if err != nil { + return err + } + if isClusterEnabled { + msg = "cluster reset" + job.runtime.Logger.Info(fmt.Sprintf("slave(%s) run cluster reset", cli.Addr)) + err = cli.ClusterReset() + } else { + msg = "slaveof no one" + job.runtime.Logger.Info(fmt.Sprintf("slave(%s) run 'slaveof no one'", cli.Addr)) + _, err = cli.SlaveOf("no", "one") + } + if err != nil { + return err + } + // wait util slave to master + for { + time.Sleep(3 * time.Second) + role, err = cli.GetRole() + if err != nil { + return err + } + if role == consts.RedisMasterRole { + job.runtime.Logger.Info("after %s, slave(%s) role:%s", msg, cli.Addr, role) + break + } + job.runtime.Logger.Info("slave(%s) role:%s,retry...", cli.Addr, role) + } + } + return nil +} + +// slaveFlushall slaveof flushall +func (job *ReplicasForceResync) slaveFlushall() (err error) { + cmd := []string{consts.CacheFlushAllRename} + for _, cli := range job.slaveAddrMapCli { + job.runtime.Logger.Info("slave(%s) flushall...", cli.Addr) + _, err = cli.DoCommand(cmd, 0) + if err != nil { + return err + } + } + return nil +} + +func (job *ReplicasForceResync) clusterMeetAndUtilFinish() (err error) { + var isClusterEnabled bool + for _, cli := range job.slaveAddrMapCli { + isClusterEnabled, err = cli.IsClusterEnabled() + if err != nil { + return err + } + if !isClusterEnabled { + job.runtime.Logger.Info("slave(%s) cluster disbaled,skip cluster meet...", cli.Addr) + continue + } + masterIP := job.slavePairsMap[cli.Addr].MasterIP + masterPort := job.slavePairsMap[cli.Addr].MasterPort + err = cli.ClusterMeetAndUtilFinish(masterIP, strconv.Itoa(masterPort)) + if err != nil { + return err + } + } + return nil +} + +// ReplicaResync slaveof $master_ip $master_port and wait util master_link_status ok +func (job *ReplicasForceResync) ReplicaResync() (err error) { + resyncTasks := make([]*ReplicaTask, 0, len(job.slavePairsMap)) + for _, item := range job.slavePairsMap { + resyncTasks = append(resyncTasks, &ReplicaTask{ + ReplicaItem: item, + runtime: job.runtime, + }) + } + job.runtime.Logger.Info("slave start resync...resyncTasks:%s", util.ToString(resyncTasks)) + err = GroupRunReplicaTasksAndWait(resyncTasks, job.runtime) + if err != nil { + return err + } + return nil +} + +// Retry times +func (job *ReplicasForceResync) Retry() uint { + return 2 +} + +// Rollback rollback +func (job *ReplicasForceResync) Rollback() error { + return nil +} diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go index cb722be478..755fa6a47f 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go @@ -205,6 +205,7 @@ func (m *JobGenericManager) atomjobsMapperLoading() { m.atomJobMapper[atomproxy.NewPredixyAddModulesCmds().Name()] = atomproxy.NewPredixyAddModulesCmds m.atomJobMapper[atomredis.NewRedisReshape().Name()] = atomredis.NewRedisReshape m.atomJobMapper[atomredis.NewClusterResetFlushMeet().Name()] = atomredis.NewClusterResetFlushMeet + m.atomJobMapper[atomredis.NewReplicasForceResync().Name()] = atomredis.NewReplicasForceResync // 老备份系统 // m.atomJobMapper[atomredis.NewRedisDataRecover().Name()] = atomredis.NewRedisDataRecover m.atomJobMapper[atomredis.NewRedisDataStructure().Name()] = atomredis.NewRedisDataStructure diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go b/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go index 8168ea16e8..cb66c797df 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go @@ -130,7 +130,7 @@ func (job *Job) Run() { } // GetRedisUsedMemory 并发获得redis实例的used_memory -func (job *Job) GetRedisUsedMemory() (err error) { +func (job *Job) GetRedisUsedMemory() { job.SortUsedMemItems = []*RedisUsedMemItem{} for _, server := range job.Conf.Servers { if !consts.IsRedisMetaRole(server.MetaRole) { @@ -151,7 +151,8 @@ func (job *Job) GetRedisUsedMemory() (err error) { redisItem := item redisItem.GetPassword() if redisItem.Err != nil { - return redisItem.Err + job.Err = redisItem.Err + return } } // concurrently get multi redis used memory @@ -213,7 +214,8 @@ func (job *Job) GetRedisUsedMemory() (err error) { redisItem := item if redisItem.Err != nil { mylog.Logger.Error(redisItem.Err.Error()) - return redisItem.Err + job.Err = redisItem.Err + return } job.UsedMemSum += redisItem.UsedMemory job.addrToUsedMem[redisItem.Addr()] = redisItem.UsedMemory @@ -221,7 +223,7 @@ func (job *Job) GetRedisUsedMemory() (err error) { sort.Slice(job.SortUsedMemItems, func(i, j int) bool { return job.SortUsedMemItems[i].UsedMemory < job.SortUsedMemItems[j].UsedMemory }) - return nil + return } // DisConnectAllRedis disconnect all redis client connection in job.SortUsedMemItems @@ -463,15 +465,25 @@ func (job *Job) CalculateRedisMaxMemory() { // 获取最大的used_memory的redis节点 maxUsedMemItem := job.SortUsedMemItems[len(job.SortUsedMemItems)-1] + var sumMaxmemory int64 = 0 for _, item := range job.SortUsedMemItems { redisItem := item redisItem.MaxMemory = int64(float64(redisItem.UsedMemory)/float64(job.UsedMemSum)*float64(job.OsAvailMem- job.UsedMemSum) + float64(redisItem.UsedMemory)) + sumMaxmemory = sumMaxmemory + redisItem.MaxMemory if maxUsedMemItem.UsedMemory > 10*redisItem.UsedMemory { redisItem.MaxMemory = redisItem.MaxMemory + 500*consts.MiByte + // 超小的redis,则其maxmemory不添加到 sumMaxmemory } } + if sumMaxmemory > job.OsAvailMem { + // 如果sumMaxmemory大于OsAvailMem,则说明计算出来的maxmemory总和大于系统可用内存,则直接报错 + job.Err = fmt.Errorf("sumMaxmemory:%s > OsAvailMem:%s", + util.SizeToHumanStr(sumMaxmemory), util.SizeToHumanStr(job.OsAvailMem)) + mylog.Logger.Error(job.Err.Error()) + return + } return } @@ -548,16 +560,16 @@ var resourceSpecOsMem = map[string]int64{ "8g_avail": consts.GiByte * 8 * 75 / 100, // 6GB可用 "16g_min": consts.GiByte * 15, "16g_max": consts.GiByte * 16, - "16g_avail": consts.GiByte * 16 * 85 / 100, // 13.6GB可用 + "16g_avail": consts.GiByte * 16 * 81 / 100, // 13GB可用 "32g_min": consts.GiByte * 30, "32g_max": consts.GiByte * 32, "32g_avail": consts.GiByte * 32 * 85 / 100, // 27.2GB可用 "64g_min": consts.GiByte * 61, "64g_max": consts.GiByte * 64, - "64g_avail": consts.GiByte * 64 * 9 / 10, // 57.6GB可用 + "64g_avail": consts.GiByte * 64 * 87 / 100, // 55.68GB可用 "128g_min": consts.GiByte * 125, "128g_max": consts.GiByte * 128, - "128g_avail": consts.GiByte * 128 * 9 / 10, // 115.2GB可用 + "128g_avail": consts.GiByte * 128 * 90 / 100, // 115.2GB可用 } // GetLocalHostAvailMemory TODO diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index a3de9650d3..424d548506 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -486,6 +486,7 @@ class RedisActuatorActionEnum(str, StructuredEnum): PREDIXY_ADD_MODULES_CMDS = EnumField("predixy_add_modules_cmds", _("predixy_add_modules_cmds")) RESHAPE = EnumField("reshape", _("reshape")) CLUSTER_RESET_FLUSH_MEET = EnumField("cluster_reset_flush_meet", _("cluster_reset_flush_meet")) + REPLICAS_FORCE_RESYNC = EnumField("replicas_force_resync", _("replicas_force_resync")) class MongoDBActuatorActionEnum(str, StructuredEnum): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_replicas_force_resync.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_replicas_force_resync.py new file mode 100644 index 0000000000..308a33f64a --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_replicas_force_resync.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +from copy import deepcopy +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.configuration.constants import DBType +from backend.db_meta.models import Cluster +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.plugins.components.collections.redis.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.redis.get_redis_payload import GetRedisActPayloadComponent +from backend.flow.plugins.components.collections.redis.trans_flies import TransFileComponent +from backend.flow.utils.redis.redis_act_playload import RedisActPayload +from backend.flow.utils.redis.redis_context_dataclass import ActKwargs, CommonContext +from backend.flow.utils.redis.redis_proxy_util import async_multi_clusters_precheck, get_cluster_info_by_cluster_id + +logger = logging.getLogger("flow") + + +class RedisReplicasForceResyncSceneFlow(object): + def __init__(self, root_id: str, data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param data : 单据传递过来的参数列表,是dict格式 + """ + self.root_id = root_id + self.data = data + self.precheck(self.data["cluster_ids"]) + + @staticmethod + def precheck(cluster_ids: list): + async_multi_clusters_precheck(cluster_ids) + + def replicas_force_resync(self): + """ + slaves 强制重同步 + self.data (Dict): + { + "bk_biz_id": "", + "bk_cloud_id":0, + "cluster_ids":[1,2,3], + } + """ + redis_pipeline = Builder(root_id=self.root_id, data=self.data) + trans_files = GetFileList(db_type=DBType.Redis) + act_kwargs = ActKwargs() + act_kwargs.set_trans_data_dataclass = CommonContext.__name__ + act_kwargs.file_list = trans_files.redis_base() + act_kwargs.is_update_trans_data = True + act_kwargs.bk_cloud_id = self.data["bk_cloud_id"] + + redis_pipeline = Builder(root_id=self.root_id, data=self.data) + sub_pipelines = [] + for cluster_id in self.data["cluster_ids"]: + cluster = Cluster.objects.get(id=cluster_id) + cluster_info = get_cluster_info_by_cluster_id(cluster_id=cluster.id) + sub_pipeline = SubBuilder(root_id=self.root_id, data=self.data) + cluster_kwargs = deepcopy(act_kwargs) + + sub_pipeline.add_act( + act_name=_("初始化配置"), + act_component_code=GetRedisActPayloadComponent.code, + kwargs=asdict(cluster_kwargs), + ) + + cluster_kwargs.exec_ip = cluster_info["slave_ips"] + sub_pipeline.add_act( + act_name=_("下发介质包"), + act_component_code=TransFileComponent.code, + kwargs=asdict(cluster_kwargs), + ) + + cluster_kwargs.cluster = {} + acts_list = [] + for slave_ip, slave_ports in cluster_info["slave_ports"].items(): + cluster_kwargs.exec_ip = slave_ip + cluster_kwargs.cluster = {"slave_ip": slave_ip, "slave_ports": slave_ports} + cluster_kwargs.get_redis_payload_func = RedisActPayload.redis_replicas_force_resync.__name__ + acts_list.append( + { + "act_name": _("slave:{} 强制重同步").format(slave_ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(cluster_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list=acts_list) + sub_pipelines.append( + sub_pipeline.build_sub_process(sub_name=_("集群{} slave强制重同步".format(cluster_info["immute_domain"]))) + ) + + redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) + redis_pipeline.run_pipeline() diff --git a/dbm-ui/backend/flow/engine/controller/redis.py b/dbm-ui/backend/flow/engine/controller/redis.py index d94be79cfa..05efbad31a 100644 --- a/dbm-ui/backend/flow/engine/controller/redis.py +++ b/dbm-ui/backend/flow/engine/controller/redis.py @@ -50,6 +50,7 @@ ) from backend.flow.engine.bamboo.scene.redis.redis_proxy_scale import RedisProxyScaleFlow from backend.flow.engine.bamboo.scene.redis.redis_remove_dts_server import RedisRemoveDtsServerFlow +from backend.flow.engine.bamboo.scene.redis.redis_replicas_force_resync import RedisReplicasForceResyncSceneFlow from backend.flow.engine.bamboo.scene.redis.redis_reupload_old_backup_records import RedisReuploadOldBackupRecordsFlow from backend.flow.engine.bamboo.scene.redis.redis_slots_migrate import RedisSlotsMigrateFlow from backend.flow.engine.bamboo.scene.redis.redis_storages_client_conns_kill import ( @@ -401,3 +402,10 @@ def tendisplus_lightning_data(self): """ flow = TendisPlusLightningData(root_id=self.root_id, data=self.ticket_data) flow.lightning_data_flow() + + def redis_replicas_force_resync_scene(self): + """ + tendis 强制slave重建同步关系 + """ + flow = RedisReplicasForceResyncSceneFlow(root_id=self.root_id, data=self.ticket_data) + flow.replicas_force_resync() diff --git a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py index c13fea6684..bbe2656320 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py +++ b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py @@ -2464,3 +2464,17 @@ def redis_clsuter_reset_flush_meet(self, **kwargs) -> dict: "reset_flush_meet_params": params["reset_flush_meet_params"], }, } + + def redis_replicas_force_resync(self, **kwargs) -> dict: + """ + redis replicas force resync + """ + params = kwargs["params"] + return { + "db_type": DBActuatorTypeEnum.Redis.value, + "action": DBActuatorTypeEnum.Redis.value + "_" + RedisActuatorActionEnum.REPLICAS_FORCE_RESYNC.value, + "payload": { + "slave_ip": params["slave_ip"], + "slave_ports": params["slave_ports"], + }, + }