From c8778ac475fc12a12e64b0c33407b8494a692c3c Mon Sep 17 00:00:00 2001
From: lukemakeit <2302063437@qq.com>
Date: Fri, 27 Sep 2024 15:25:28 +0800
Subject: [PATCH] =?UTF-8?q?feat(redis):=20redis=E5=BC=BA=E5=88=B6=E9=87=8D?=
 =?UTF-8?q?=E5=BB=BAslave=E5=90=8C=E6=AD=A5=E5=85=B3=E7=B3=BB=20#7135?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../example/redis_replicas_force_resync.md    |  17 +
 .../dbactuator/models/myredis/client.go       |  30 ++
 .../atomredis/redis_replicas_force_resync.go  | 365 ++++++++++++++++++
 .../dbactuator/pkg/jobmanager/jobmanager.go   |   1 +
 .../pkg/redismaxmemory/redismaxmemory.go      |  26 +-
 dbm-ui/backend/flow/consts.py                 |   1 +
 .../redis/redis_replicas_force_resync.py      | 104 +++++
 .../backend/flow/engine/controller/redis.py   |   8 +
 .../flow/utils/redis/redis_act_playload.py    |  14 +
 9 files changed, 559 insertions(+), 7 deletions(-)
 create mode 100644 dbm-services/redis/db-tools/dbactuator/example/redis_replicas_force_resync.md
 create mode 100644 dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_replicas_force_resync.go
 create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_replicas_force_resync.py

diff --git a/dbm-services/redis/db-tools/dbactuator/example/redis_replicas_force_resync.md b/dbm-services/redis/db-tools/dbactuator/example/redis_replicas_force_resync.md
new file mode 100644
index 0000000000..113c71c0d1
--- /dev/null
+++ b/dbm-services/redis/db-tools/dbactuator/example/redis_replicas_force_resync.md
@@ -0,0 +1,17 @@
+### redis_replicas_force_resync
+redis slave 强制重同步.
+
+
+```
+./dbactuator_redis  --uid={{uid}} --root_id={{root_id}} --node_id={{node_id}} --version_id={{version_id}} --atom-job-list="redis_replicas_force_resync" --data_dir=/path/to/data  --backup_dir=/path/to/backup --payload='{{payload_base64}}'
+```
+
+`--data_dir`、`--backup_dir` 可以留空.  
+
+原始payload
+```json
+{
+    "slave_ip":"xx.xx.xx.xx",
+    "slave_ports":[30000,30001]
+}
+```
\ No newline at end of file
diff --git a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go
index b59654098e..d2c9b74dd6 100644
--- a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go
+++ b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go
@@ -1215,6 +1215,36 @@ func (db *RedisClient) ClusterMeet(ip, port string) (ret string, err error) {
 	return
 }
 
+// ClusterMeetAndUtilFinish TODO
+func (db *RedisClient) ClusterMeetAndUtilFinish(ip, port string) (err error) {
+	// 执行 cluster addslots 命令只能用 普通redis client
+	if db.InstanceClient == nil {
+		err = fmt.Errorf("ClusterMeetAndUtilFinish redis:%s must create a standalone client", db.Addr)
+		mylog.Logger.Error(err.Error())
+		return
+	}
+	mylog.Logger.Info("redis(%s) 'cluster meet %s %s' start", db.Addr, ip, port)
+	_, err = db.ClusterMeet(ip, port)
+	if err != nil {
+		return
+	}
+	targetAddr := fmt.Sprintf("%s:%s", ip, port)
+	var addrMapToNodes map[string]*ClusterNodeData
+	for {
+		addrMapToNodes, err = db.GetAddrMapToNodes()
+		if err != nil {
+			return
+		}
+		if _, ok := addrMapToNodes[targetAddr]; ok {
+			mylog.Logger.Info("redis:%s cluster meet %s %s success", db.Addr, ip, port)
+			break
+		}
+		mylog.Logger.Info("redis:%s cluster meet %s %s done,but not in 'cluster nodes'", db.Addr, ip, port)
+		time.Sleep(3 * time.Second)
+	}
+	return nil
+}
+
 // ClusterAddSlots 添加slots, 'cluster addslots 'command
 func (db *RedisClient) ClusterAddSlots(slots []int) (ret string, err error) {
 	// 执行 cluster addslots 命令只能用 普通redis client
diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_replicas_force_resync.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_replicas_force_resync.go
new file mode 100644
index 0000000000..7f3051eda2
--- /dev/null
+++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_replicas_force_resync.go
@@ -0,0 +1,365 @@
+package atomredis
+
+import (
+	"encoding/json"
+	"fmt"
+	"os"
+	"path/filepath"
+	"strconv"
+	"time"
+
+	"github.com/go-playground/validator/v10"
+
+	"dbm-services/redis/db-tools/dbactuator/models/myredis"
+	"dbm-services/redis/db-tools/dbactuator/pkg/consts"
+	"dbm-services/redis/db-tools/dbactuator/pkg/jobruntime"
+	"dbm-services/redis/db-tools/dbactuator/pkg/util"
+)
+
+// ReplicaResyncParams params
+type ReplicaResyncParams struct {
+	SlaveIP    string `json:"slave_ip" validate:"required"`
+	SlavePorts []int  `json:"slave_ports" validate:"required"`
+}
+
+// ReplicasForceResync TODO
+type ReplicasForceResync struct {
+	runtime         *jobruntime.JobGenericRuntime
+	params          ReplicaResyncParams
+	slaveAddrMapCli map[string]*myredis.RedisClient
+	slavePairsMap   map[string]ReplicaItem
+}
+
+// 无实际作用,仅确保实现了 jobruntime.JobRunner 接口
+var _ jobruntime.JobRunner = (*ReplicasForceResync)(nil)
+
+// NewReplicasForceResync new
+func NewReplicasForceResync() jobruntime.JobRunner {
+	return &ReplicasForceResync{}
+}
+
+// Init prepare run env
+func (job *ReplicasForceResync) Init(m *jobruntime.JobGenericRuntime) error {
+	job.runtime = m
+	err := json.Unmarshal([]byte(job.runtime.PayloadDecoded), &job.params)
+	if err != nil {
+		job.runtime.Logger.Error(fmt.Sprintf("json.Unmarshal failed,err:%+v", err))
+		return err
+	}
+	// 参数有效性检查
+	validate := validator.New()
+	err = validate.Struct(job.params)
+	if err != nil {
+		if _, ok := err.(*validator.InvalidValidationError); ok {
+			job.runtime.Logger.Error("ReplicasForceResync Init params validate failed,err:%v,params:%+v",
+				err, job.params)
+			return err
+		}
+		for _, err := range err.(validator.ValidationErrors) {
+			job.runtime.Logger.Error("ReplicasForceResync Init params validate failed,err:%v,params:%+v",
+				err, job.params)
+			return err
+		}
+	}
+	return nil
+}
+
+// Name 原子任务名
+func (job *ReplicasForceResync) Name() string {
+	return "redis_replicas_force_resync"
+}
+
+// getReplicaPairsFile 获取 master slave pairs 文件
+func (job *ReplicasForceResync) getReplicaPairsFile() string {
+	file := fmt.Sprintf("%s_%s_master_replica_pairs", job.runtime.UID, job.params.SlaveIP)
+	saveDir := filepath.Join(consts.GetRedisBackupDir(), "dbbak", "replicas_force_resync")
+	if !util.FileExists(saveDir) {
+		util.MkDirsIfNotExists([]string{saveDir})
+		util.LocalDirChownMysql(saveDir)
+	}
+	fullPath := filepath.Join(saveDir, file)
+	return fullPath
+}
+
+// checkReplicaParisFileExistsAndNotEmpty 检查文件是否存在且不为空
+// 如果文件不存在或为空,则返回 false
+// 如果文件存在且不为空,则返回 true
+func (job *ReplicasForceResync) checkReplicaParisFileExistsAndNotEmpty() (ok bool, err error) {
+	fullPath := job.getReplicaPairsFile()
+	if !util.FileExists(fullPath) {
+		return false, nil
+	}
+	statInfo, err := os.Stat(fullPath)
+	if err != nil {
+		err = fmt.Errorf("os.Stat failed,err:%v,fullPath:%s", err, fullPath)
+		job.runtime.Logger.Error(err.Error())
+		return false, err
+	}
+	if statInfo.Size() == 0 {
+		return false, nil
+	}
+	return true, nil
+}
+
+// writeSlavePairsToFile 将主从关系写入文件
+func (job *ReplicasForceResync) writeSlavePairsToFile() (err error) {
+	pairFile := job.getReplicaPairsFile()
+	pairData, err := json.Marshal(job.slavePairsMap)
+	if err != nil {
+		err = fmt.Errorf("json.Marshal failed,err:%v,pairData:%+v", err, job.slavePairsMap)
+		job.runtime.Logger.Error(err.Error())
+		return err
+	}
+	err = os.WriteFile(pairFile, pairData, 0644)
+	if err != nil {
+		err = fmt.Errorf("os.WriteFile failed,err:%v,pairFile:%s,pairData:%s", err, pairFile, pairData)
+		job.runtime.Logger.Error(err.Error())
+		return err
+	}
+	util.LocalDirChownMysql(pairFile)
+	job.runtime.Logger.Info(fmt.Sprintf("writeSlavePairsToFile to %s ok", pairFile))
+	return nil
+}
+
+// getSlavePairsFromFile 从slave pairs文件中读取数据,并填充到job.slavePairsMap中,得到主从关系
+func (job *ReplicasForceResync) getSlavePairsFromFile() (err error) {
+	pairFile := job.getReplicaPairsFile()
+	if !util.FileExists(pairFile) {
+		err = fmt.Errorf("file:%s not exists", pairFile)
+		job.runtime.Logger.Error(err.Error())
+		return err
+	}
+	pairData, err := os.ReadFile(pairFile)
+	if err != nil {
+		err = fmt.Errorf("os.ReadFile failed,err:%v,pairFile:%s", err, pairFile)
+		job.runtime.Logger.Error(err.Error())
+		return err
+	}
+	pairDecoded := make(map[string]ReplicaItem)
+	err = json.Unmarshal(pairData, &pairDecoded)
+	if err != nil {
+		err = fmt.Errorf("json.Unmarshal failed,err:%v,pairData:%s", err, pairData)
+		job.runtime.Logger.Error(err.Error())
+		return err
+	}
+	for k, v := range pairDecoded {
+		if _, ok := job.slavePairsMap[k]; !ok {
+			job.slavePairsMap[k] = v
+		}
+	}
+	job.runtime.Logger.Info(fmt.Sprintf("completeSlavePairs from %s ok", pairFile))
+	return nil
+}
+
+// Run Command Run
+func (job *ReplicasForceResync) Run() (err error) {
+	util.StopBkDbmon()
+	defer util.StartBkDbmon()
+
+	job.slavePairsMap = make(map[string]ReplicaItem)
+	err = job.slaveInstsAbleToConnect()
+	if err != nil {
+		return err
+	}
+	defer job.allInstDisconnect()
+
+	// slaveof no one
+	err = job.slaveofNoOneAndUtilToMaster()
+	if err != nil {
+		return err
+	}
+	// slave flushall
+	err = job.slaveFlushall()
+	if err != nil {
+		return err
+	}
+	err = job.clusterMeetAndUtilFinish()
+	if err != nil {
+		return err
+	}
+	// slaveof master_ip master_port
+	err = job.ReplicaResync()
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// slaveInstsAbleToConnect 检查所有slave可连接
+func (job *ReplicasForceResync) slaveInstsAbleToConnect() (err error) {
+	instsAddrs := make([]string, 0, len(job.params.SlavePorts))
+	job.slaveAddrMapCli = make(map[string]*myredis.RedisClient, len(job.params.SlavePorts))
+	var addr, password, role, masterHost, masterPortStr string
+	pairFileOK, err := job.checkReplicaParisFileExistsAndNotEmpty()
+	if err != nil {
+		return err
+	}
+	allInstancesSlave := true
+	for _, port := range job.params.SlavePorts {
+		addr = fmt.Sprintf("%s:%d", job.params.SlaveIP, port)
+		instsAddrs = append(instsAddrs, addr)
+		password, err = myredis.GetRedisPasswdFromConfFile(port)
+		if err != nil {
+			return err
+		}
+		cli, err := myredis.NewRedisClientWithTimeout(addr, password, 0,
+			consts.TendisTypeRedisInstance, 5*time.Second)
+		if err != nil {
+			return err
+		}
+		role, err = cli.GetRole()
+		if err != nil {
+			return err
+		}
+		if role != consts.RedisSlaveRole {
+			if !pairFileOK {
+				// 如果存在非slave角色,同时 不存在slave pairs 文件
+				// 代表是第一次运行,这种情况报错
+				// 第一次运行,slave关系必须是ok的
+				err = fmt.Errorf("redis instance(%s) role:%s,not slave role", addr, role)
+				job.runtime.Logger.Error(err.Error())
+				return err
+			}
+			allInstancesSlave = false
+		} else {
+			// 获取 master_host and master_port
+			masterHost, masterPortStr, _, _, err = cli.GetMasterData()
+			if err != nil {
+				return err
+			}
+			masterPort, _ := strconv.Atoi(masterPortStr)
+			job.slavePairsMap[addr] = ReplicaItem{
+				MasterIP:      masterHost,
+				MasterPort:    masterPort,
+				MasterAuth:    password,
+				SlaveIP:       job.params.SlaveIP,
+				SlavePort:     port,
+				SlavePassword: password,
+			}
+		}
+		job.slaveAddrMapCli[addr] = cli
+	}
+	if !allInstancesSlave {
+		if !pairFileOK {
+			err = fmt.Errorf("slave instances not all slave role,and %s not exist", job.getReplicaPairsFile())
+			job.runtime.Logger.Error(err.Error())
+			return err
+		}
+		err = job.getSlavePairsFromFile()
+		if err != nil {
+			return err
+		}
+	}
+	err = job.writeSlavePairsToFile()
+	if err != nil {
+		return err
+	}
+	job.runtime.Logger.Info("all slave instances able to connect,(%+v)", instsAddrs)
+	return nil
+}
+
+// allInstDisconnect 所有实例断开连接
+func (job *ReplicasForceResync) allInstDisconnect() {
+	for _, cli := range job.slaveAddrMapCli {
+		cli.Close()
+	}
+}
+
+// slaveofNoOneAndUtilToMaster slaveof on one and util to master
+func (job *ReplicasForceResync) slaveofNoOneAndUtilToMaster() (err error) {
+	var role, msg string
+	var isClusterEnabled bool
+	for _, cli := range job.slaveAddrMapCli {
+		isClusterEnabled, err = cli.IsClusterEnabled()
+		if err != nil {
+			return err
+		}
+		if isClusterEnabled {
+			msg = "cluster reset"
+			job.runtime.Logger.Info(fmt.Sprintf("slave(%s) run cluster reset", cli.Addr))
+			err = cli.ClusterReset()
+		} else {
+			msg = "slaveof no one"
+			job.runtime.Logger.Info(fmt.Sprintf("slave(%s) run 'slaveof no one'", cli.Addr))
+			_, err = cli.SlaveOf("no", "one")
+		}
+		if err != nil {
+			return err
+		}
+		// wait util slave to master
+		for {
+			time.Sleep(3 * time.Second)
+			role, err = cli.GetRole()
+			if err != nil {
+				return err
+			}
+			if role == consts.RedisMasterRole {
+				job.runtime.Logger.Info("after %s, slave(%s) role:%s", msg, cli.Addr, role)
+				break
+			}
+			job.runtime.Logger.Info("slave(%s) role:%s,retry...", cli.Addr, role)
+		}
+	}
+	return nil
+}
+
+// slaveFlushall slaveof flushall
+func (job *ReplicasForceResync) slaveFlushall() (err error) {
+	cmd := []string{consts.CacheFlushAllRename}
+	for _, cli := range job.slaveAddrMapCli {
+		job.runtime.Logger.Info("slave(%s) flushall...", cli.Addr)
+		_, err = cli.DoCommand(cmd, 0)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (job *ReplicasForceResync) clusterMeetAndUtilFinish() (err error) {
+	var isClusterEnabled bool
+	for _, cli := range job.slaveAddrMapCli {
+		isClusterEnabled, err = cli.IsClusterEnabled()
+		if err != nil {
+			return err
+		}
+		if !isClusterEnabled {
+			job.runtime.Logger.Info("slave(%s) cluster disbaled,skip cluster meet...", cli.Addr)
+			continue
+		}
+		masterIP := job.slavePairsMap[cli.Addr].MasterIP
+		masterPort := job.slavePairsMap[cli.Addr].MasterPort
+		err = cli.ClusterMeetAndUtilFinish(masterIP, strconv.Itoa(masterPort))
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// ReplicaResync slaveof $master_ip $master_port and wait util master_link_status ok
+func (job *ReplicasForceResync) ReplicaResync() (err error) {
+	resyncTasks := make([]*ReplicaTask, 0, len(job.slavePairsMap))
+	for _, item := range job.slavePairsMap {
+		resyncTasks = append(resyncTasks, &ReplicaTask{
+			ReplicaItem: item,
+			runtime:     job.runtime,
+		})
+	}
+	job.runtime.Logger.Info("slave start resync...resyncTasks:%s", util.ToString(resyncTasks))
+	err = GroupRunReplicaTasksAndWait(resyncTasks, job.runtime)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// Retry times
+func (job *ReplicasForceResync) Retry() uint {
+	return 2
+}
+
+// Rollback rollback
+func (job *ReplicasForceResync) Rollback() error {
+	return nil
+}
diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go
index cb722be478..755fa6a47f 100644
--- a/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go
+++ b/dbm-services/redis/db-tools/dbactuator/pkg/jobmanager/jobmanager.go
@@ -205,6 +205,7 @@ func (m *JobGenericManager) atomjobsMapperLoading() {
 		m.atomJobMapper[atomproxy.NewPredixyAddModulesCmds().Name()] = atomproxy.NewPredixyAddModulesCmds
 		m.atomJobMapper[atomredis.NewRedisReshape().Name()] = atomredis.NewRedisReshape
 		m.atomJobMapper[atomredis.NewClusterResetFlushMeet().Name()] = atomredis.NewClusterResetFlushMeet
+		m.atomJobMapper[atomredis.NewReplicasForceResync().Name()] = atomredis.NewReplicasForceResync
 		// 老备份系统
 		// m.atomJobMapper[atomredis.NewRedisDataRecover().Name()] = atomredis.NewRedisDataRecover
 		m.atomJobMapper[atomredis.NewRedisDataStructure().Name()] = atomredis.NewRedisDataStructure
diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go b/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go
index 8168ea16e8..cb66c797df 100644
--- a/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go
+++ b/dbm-services/redis/db-tools/dbmon/pkg/redismaxmemory/redismaxmemory.go
@@ -130,7 +130,7 @@ func (job *Job) Run() {
 }
 
 // GetRedisUsedMemory 并发获得redis实例的used_memory
-func (job *Job) GetRedisUsedMemory() (err error) {
+func (job *Job) GetRedisUsedMemory() {
 	job.SortUsedMemItems = []*RedisUsedMemItem{}
 	for _, server := range job.Conf.Servers {
 		if !consts.IsRedisMetaRole(server.MetaRole) {
@@ -151,7 +151,8 @@ func (job *Job) GetRedisUsedMemory() (err error) {
 		redisItem := item
 		redisItem.GetPassword()
 		if redisItem.Err != nil {
-			return redisItem.Err
+			job.Err = redisItem.Err
+			return
 		}
 	}
 	// concurrently get multi redis used memory
@@ -213,7 +214,8 @@ func (job *Job) GetRedisUsedMemory() (err error) {
 		redisItem := item
 		if redisItem.Err != nil {
 			mylog.Logger.Error(redisItem.Err.Error())
-			return redisItem.Err
+			job.Err = redisItem.Err
+			return
 		}
 		job.UsedMemSum += redisItem.UsedMemory
 		job.addrToUsedMem[redisItem.Addr()] = redisItem.UsedMemory
@@ -221,7 +223,7 @@ func (job *Job) GetRedisUsedMemory() (err error) {
 	sort.Slice(job.SortUsedMemItems, func(i, j int) bool {
 		return job.SortUsedMemItems[i].UsedMemory < job.SortUsedMemItems[j].UsedMemory
 	})
-	return nil
+	return
 }
 
 // DisConnectAllRedis disconnect all redis client connection in job.SortUsedMemItems
@@ -463,15 +465,25 @@ func (job *Job) CalculateRedisMaxMemory() {
 
 	// 获取最大的used_memory的redis节点
 	maxUsedMemItem := job.SortUsedMemItems[len(job.SortUsedMemItems)-1]
+	var sumMaxmemory int64 = 0
 	for _, item := range job.SortUsedMemItems {
 		redisItem := item
 		redisItem.MaxMemory = int64(float64(redisItem.UsedMemory)/float64(job.UsedMemSum)*float64(job.OsAvailMem-
 			job.UsedMemSum) +
 			float64(redisItem.UsedMemory))
+		sumMaxmemory = sumMaxmemory + redisItem.MaxMemory
 		if maxUsedMemItem.UsedMemory > 10*redisItem.UsedMemory {
 			redisItem.MaxMemory = redisItem.MaxMemory + 500*consts.MiByte
+			// 超小的redis,则其maxmemory不添加到 sumMaxmemory
 		}
 	}
+	if sumMaxmemory > job.OsAvailMem {
+		// 如果sumMaxmemory大于OsAvailMem,则说明计算出来的maxmemory总和大于系统可用内存,则直接报错
+		job.Err = fmt.Errorf("sumMaxmemory:%s > OsAvailMem:%s",
+			util.SizeToHumanStr(sumMaxmemory), util.SizeToHumanStr(job.OsAvailMem))
+		mylog.Logger.Error(job.Err.Error())
+		return
+	}
 	return
 }
 
@@ -548,16 +560,16 @@ var resourceSpecOsMem = map[string]int64{
 	"8g_avail":   consts.GiByte * 8 * 75 / 100, // 6GB可用
 	"16g_min":    consts.GiByte * 15,
 	"16g_max":    consts.GiByte * 16,
-	"16g_avail":  consts.GiByte * 16 * 85 / 100, // 13.6GB可用
+	"16g_avail":  consts.GiByte * 16 * 81 / 100, // 13GB可用
 	"32g_min":    consts.GiByte * 30,
 	"32g_max":    consts.GiByte * 32,
 	"32g_avail":  consts.GiByte * 32 * 85 / 100, // 27.2GB可用
 	"64g_min":    consts.GiByte * 61,
 	"64g_max":    consts.GiByte * 64,
-	"64g_avail":  consts.GiByte * 64 * 9 / 10, // 57.6GB可用
+	"64g_avail":  consts.GiByte * 64 * 87 / 100, // 55.68GB可用
 	"128g_min":   consts.GiByte * 125,
 	"128g_max":   consts.GiByte * 128,
-	"128g_avail": consts.GiByte * 128 * 9 / 10, // 115.2GB可用
+	"128g_avail": consts.GiByte * 128 * 90 / 100, // 115.2GB可用
 }
 
 // GetLocalHostAvailMemory TODO
diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py
index b84acdbf83..ccc6b694bc 100644
--- a/dbm-ui/backend/flow/consts.py
+++ b/dbm-ui/backend/flow/consts.py
@@ -483,6 +483,7 @@ class RedisActuatorActionEnum(str, StructuredEnum):
     PREDIXY_ADD_MODULES_CMDS = EnumField("predixy_add_modules_cmds", _("predixy_add_modules_cmds"))
     RESHAPE = EnumField("reshape", _("reshape"))
     CLUSTER_RESET_FLUSH_MEET = EnumField("cluster_reset_flush_meet", _("cluster_reset_flush_meet"))
+    REPLICAS_FORCE_RESYNC = EnumField("replicas_force_resync", _("replicas_force_resync"))
 
 
 class MongoDBActuatorActionEnum(str, StructuredEnum):
diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_replicas_force_resync.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_replicas_force_resync.py
new file mode 100644
index 0000000000..308a33f64a
--- /dev/null
+++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_replicas_force_resync.py
@@ -0,0 +1,104 @@
+# -*- coding: utf-8 -*-
+"""
+TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
+Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
+Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
+You may obtain a copy of the License at https://opensource.org/licenses/MIT
+Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+"""
+import logging.config
+from copy import deepcopy
+from dataclasses import asdict
+from typing import Dict, Optional
+
+from django.utils.translation import ugettext as _
+
+from backend.configuration.constants import DBType
+from backend.db_meta.models import Cluster
+from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
+from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
+from backend.flow.plugins.components.collections.redis.exec_actuator_script import ExecuteDBActuatorScriptComponent
+from backend.flow.plugins.components.collections.redis.get_redis_payload import GetRedisActPayloadComponent
+from backend.flow.plugins.components.collections.redis.trans_flies import TransFileComponent
+from backend.flow.utils.redis.redis_act_playload import RedisActPayload
+from backend.flow.utils.redis.redis_context_dataclass import ActKwargs, CommonContext
+from backend.flow.utils.redis.redis_proxy_util import async_multi_clusters_precheck, get_cluster_info_by_cluster_id
+
+logger = logging.getLogger("flow")
+
+
+class RedisReplicasForceResyncSceneFlow(object):
+    def __init__(self, root_id: str, data: Optional[Dict]):
+        """
+        @param root_id : 任务流程定义的root_id
+        @param data : 单据传递过来的参数列表,是dict格式
+        """
+        self.root_id = root_id
+        self.data = data
+        self.precheck(self.data["cluster_ids"])
+
+    @staticmethod
+    def precheck(cluster_ids: list):
+        async_multi_clusters_precheck(cluster_ids)
+
+    def replicas_force_resync(self):
+        """
+        slaves 强制重同步
+        self.data (Dict):
+        {
+            "bk_biz_id": "",
+            "bk_cloud_id":0,
+            "cluster_ids":[1,2,3],
+        }
+        """
+        redis_pipeline = Builder(root_id=self.root_id, data=self.data)
+        trans_files = GetFileList(db_type=DBType.Redis)
+        act_kwargs = ActKwargs()
+        act_kwargs.set_trans_data_dataclass = CommonContext.__name__
+        act_kwargs.file_list = trans_files.redis_base()
+        act_kwargs.is_update_trans_data = True
+        act_kwargs.bk_cloud_id = self.data["bk_cloud_id"]
+
+        redis_pipeline = Builder(root_id=self.root_id, data=self.data)
+        sub_pipelines = []
+        for cluster_id in self.data["cluster_ids"]:
+            cluster = Cluster.objects.get(id=cluster_id)
+            cluster_info = get_cluster_info_by_cluster_id(cluster_id=cluster.id)
+            sub_pipeline = SubBuilder(root_id=self.root_id, data=self.data)
+            cluster_kwargs = deepcopy(act_kwargs)
+
+            sub_pipeline.add_act(
+                act_name=_("初始化配置"),
+                act_component_code=GetRedisActPayloadComponent.code,
+                kwargs=asdict(cluster_kwargs),
+            )
+
+            cluster_kwargs.exec_ip = cluster_info["slave_ips"]
+            sub_pipeline.add_act(
+                act_name=_("下发介质包"),
+                act_component_code=TransFileComponent.code,
+                kwargs=asdict(cluster_kwargs),
+            )
+
+            cluster_kwargs.cluster = {}
+            acts_list = []
+            for slave_ip, slave_ports in cluster_info["slave_ports"].items():
+                cluster_kwargs.exec_ip = slave_ip
+                cluster_kwargs.cluster = {"slave_ip": slave_ip, "slave_ports": slave_ports}
+                cluster_kwargs.get_redis_payload_func = RedisActPayload.redis_replicas_force_resync.__name__
+                acts_list.append(
+                    {
+                        "act_name": _("slave:{} 强制重同步").format(slave_ip),
+                        "act_component_code": ExecuteDBActuatorScriptComponent.code,
+                        "kwargs": asdict(cluster_kwargs),
+                    }
+                )
+            sub_pipeline.add_parallel_acts(acts_list=acts_list)
+            sub_pipelines.append(
+                sub_pipeline.build_sub_process(sub_name=_("集群{} slave强制重同步".format(cluster_info["immute_domain"])))
+            )
+
+            redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)
+        redis_pipeline.run_pipeline()
diff --git a/dbm-ui/backend/flow/engine/controller/redis.py b/dbm-ui/backend/flow/engine/controller/redis.py
index d94be79cfa..05efbad31a 100644
--- a/dbm-ui/backend/flow/engine/controller/redis.py
+++ b/dbm-ui/backend/flow/engine/controller/redis.py
@@ -50,6 +50,7 @@
 )
 from backend.flow.engine.bamboo.scene.redis.redis_proxy_scale import RedisProxyScaleFlow
 from backend.flow.engine.bamboo.scene.redis.redis_remove_dts_server import RedisRemoveDtsServerFlow
+from backend.flow.engine.bamboo.scene.redis.redis_replicas_force_resync import RedisReplicasForceResyncSceneFlow
 from backend.flow.engine.bamboo.scene.redis.redis_reupload_old_backup_records import RedisReuploadOldBackupRecordsFlow
 from backend.flow.engine.bamboo.scene.redis.redis_slots_migrate import RedisSlotsMigrateFlow
 from backend.flow.engine.bamboo.scene.redis.redis_storages_client_conns_kill import (
@@ -401,3 +402,10 @@ def tendisplus_lightning_data(self):
         """
         flow = TendisPlusLightningData(root_id=self.root_id, data=self.ticket_data)
         flow.lightning_data_flow()
+
+    def redis_replicas_force_resync_scene(self):
+        """
+        tendis 强制slave重建同步关系
+        """
+        flow = RedisReplicasForceResyncSceneFlow(root_id=self.root_id, data=self.ticket_data)
+        flow.replicas_force_resync()
diff --git a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py
index c13fea6684..bbe2656320 100644
--- a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py
+++ b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py
@@ -2464,3 +2464,17 @@ def redis_clsuter_reset_flush_meet(self, **kwargs) -> dict:
                 "reset_flush_meet_params": params["reset_flush_meet_params"],
             },
         }
+
+    def redis_replicas_force_resync(self, **kwargs) -> dict:
+        """
+        redis replicas force resync
+        """
+        params = kwargs["params"]
+        return {
+            "db_type": DBActuatorTypeEnum.Redis.value,
+            "action": DBActuatorTypeEnum.Redis.value + "_" + RedisActuatorActionEnum.REPLICAS_FORCE_RESYNC.value,
+            "payload": {
+                "slave_ip": params["slave_ip"],
+                "slave_ports": params["slave_ports"],
+            },
+        }