diff --git a/dbm-services/redis/db-tools/dbactuator/example/bkdbmon_install.example.md b/dbm-services/redis/db-tools/dbactuator/example/bkdbmon_install.example.md index 4d18a8bf7d..2b740cd205 100644 --- a/dbm-services/redis/db-tools/dbactuator/example/bkdbmon_install.example.md +++ b/dbm-services/redis/db-tools/dbactuator/example/bkdbmon_install.example.md @@ -59,9 +59,9 @@ bk-dbmon安装: "bk_cloud_id":"246", "app":"testapp", "app_name":"测试app", - "cluster_domain":"tendisx.aaaa.testapp.db", + "cluster_domain":"cache.aaaa.testapp.db", "cluster_name":"aaaa", - "cluster_type":"PredixyTendisplusCluster", + "cluster_type":"TwemproxyRedisInstance", "meta_role":"redis_master", "server_ip":"127.0.0.1", "server_ports":[ @@ -69,7 +69,14 @@ bk-dbmon安装: 30001, 30002, 30003 - ] + ], + "server_shards":{ + "a.a.a.a:12000":"0-104999", + "a.a.a.a:12001":"105000-209999", + "a.a.a.a:12002":"210000-314999", + "a.a.a.a:12003":"315000-419999" + }, + "cache_backup_mode":"rdb" }, { "bk_biz_id":"200500194", @@ -86,7 +93,9 @@ bk-dbmon安装: 31001, 31002, 31003 - ] + ], + "server_shards":{}, + "cache_backup_mode":"" } ] } diff --git a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go index 2d0fd16530..1d0e12da77 100644 --- a/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go +++ b/dbm-services/redis/db-tools/dbactuator/models/myredis/client.go @@ -712,8 +712,8 @@ func (db *RedisClient) Sscan(keyname string, cursor uint64, match string, count fields, retCursor, err = db.InstanceClient.SScan(context.TODO(), keyname, cursor, match, count).Result() } if err != nil && err != redis.Nil { - mylog.Logger.Error("Redis 'sscan %s %d match %s count %s' command fail,err:%v,addr:%s", keyname, cursor, match, count, - err, db.Addr) + mylog.Logger.Error("Redis 'sscan %s %d match %s count %d' command fail,err:%v,addr:%s", + keyname, cursor, match, count, err, db.Addr) return fields, 0, err } return fields, retCursor, nil @@ -1761,3 +1761,33 @@ func (db *RedisClient) GetClusterNodesStr() (ret string, err error) { } return } + +// RedisClusterGetMasterNode 获取master节点信息(如果 addr是master则返回它的node信息,否则找到它的masterID,进而找到master的node信息) +func (db *RedisClient) RedisClusterGetMasterNode(addr string) (masterNode *ClusterNodeData, err error) { + addrToNodes, err := db.GetAddrMapToNodes() + if err != nil { + return + } + myNode, ok := addrToNodes[addr] + if !ok { + err = fmt.Errorf("addr:%s not found in cluster nodes", addr) + mylog.Logger.Error(err.Error()) + return + } + if myNode.GetRole() == consts.RedisMasterRole { + masterNode = myNode + return + } + masterNodeID := myNode.MasterID + idToNode, err := db.GetNodeIDMapToNodes() + if err != nil { + return + } + masterNode, ok = idToNode[masterNodeID] + if !ok { + err = fmt.Errorf("masterNodeID:%s not found in cluster nodes", masterNodeID) + mylog.Logger.Error(err.Error()) + return + } + return +} diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/bkdbmon_install.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/bkdbmon_install.go index 158f25e9a0..6352bc8c76 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/bkdbmon_install.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/bkdbmon_install.go @@ -22,17 +22,19 @@ import ( // ConfServerItem servers配置项 type ConfServerItem struct { - BkBizID string `json:"bk_biz_id" yaml:"bk_biz_id" validate:"required"` - BkCloudID int64 `json:"bk_cloud_id" yaml:"bk_cloud_id"` - App string `json:"app" yaml:"app" validate:"required"` - AppName string `json:"app_name" yaml:"app_name" validate:"required"` - ClusterDomain string `json:"cluster_domain" yaml:"cluster_domain" validate:"required"` - ClusterName string `json:"cluster_name" yaml:"cluster_name" validate:"required"` - ClusterType string `json:"cluster_type" yaml:"cluster_type" validate:"required"` - MetaRole string `json:"meta_role" yaml:"meta_role" validate:"required"` - ServerIP string `json:"server_ip" yaml:"server_ip" validate:"required"` - ServerPorts []int `json:"server_ports" yaml:"server_ports" validate:"required"` - Shard string `json:"shard" yaml:"shard"` + BkBizID string `json:"bk_biz_id" yaml:"bk_biz_id" validate:"required"` + BkCloudID int64 `json:"bk_cloud_id" yaml:"bk_cloud_id"` + App string `json:"app" yaml:"app" validate:"required"` + AppName string `json:"app_name" yaml:"app_name" validate:"required"` + ClusterDomain string `json:"cluster_domain" yaml:"cluster_domain" validate:"required"` + ClusterName string `json:"cluster_name" yaml:"cluster_name" validate:"required"` + ClusterType string `json:"cluster_type" yaml:"cluster_type" validate:"required"` + MetaRole string `json:"meta_role" yaml:"meta_role" validate:"required"` + ServerIP string `json:"server_ip" yaml:"server_ip" validate:"required"` + ServerPorts []int `json:"server_ports" yaml:"server_ports" validate:"required"` + ServerShards map[string]string `json:"server_shards" yaml:"server_shards"` + CacheBackupMode string `json:"cache_backup_mode" yaml:"cache_backup_mode"` // aof or rdb + Shard string `json:"shard" yaml:"shard"` } // BkDbmonInstallParams 安装参数 @@ -90,7 +92,7 @@ func (job *BkDbmonInstall) Init(m *jobruntime.JobGenericRuntime) error { } } for _, svrItem := range job.params.Servers { - if len(svrItem.ServerPorts) >= 0 { + if len(svrItem.ServerPorts) > 0 { if svrItem.ServerIP == "" { job.runtime.Logger.Error("BkDbmonInstall Init params validate failed,err:ServerIP is empty") return fmt.Errorf("ServerIP is empty") diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go index ee20d99f07..78410bd28e 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go @@ -21,6 +21,7 @@ import ( "github.com/go-playground/validator/v10" "github.com/gofrs/flock" + "gopkg.in/yaml.v2" ) // TendisSSDSetLogCount tendisSSD设置log参数 @@ -250,8 +251,9 @@ type BackupTask struct { BackupFile string `json:"backup_file"` // 备份的目标文件,如果文件过大会切割成多个 BackupFileSize int64 `json:"backup_file_size"` // 备份文件大小(已切割 or 已压缩 or 已打包) BackupTaskID string `json:"backup_taskid"` - BackupMD5 string `json:"backup_md5"` // 目前为空 - BackupTag string `json:"backup_tag"` // REDIS_FULL or REDIS_BINLOG + BackupMD5 string `json:"backup_md5"` // 目前为空 + BackupTag string `json:"backup_tag"` // REDIS_FULL or REDIS_BINLOG + ShardValue string `json:"shard_value"` // shard值 // 全备尽管会切成多个文件,但其生成的起始时间、结束时间一样 StartTime customtime.CustomTime `json:"start_time"` // 生成全备的起始时间 EndTime customtime.CustomTime `json:"end_time"` // //生成全备的结束时间 @@ -359,6 +361,10 @@ func (task *BackupTask) GoFullBakcup() { if task.Err != nil { return } + task.getRedisShardVal() + if task.Err != nil { + return + } // 如果有备份正在执行,则先等待其完成 task.Err = task.Cli.WaitForBackupFinish() @@ -469,6 +475,54 @@ func (task *BackupTask) PrecheckDisk() { task.Addr(), task.DataSize/1024/1024, task.BackupDir, bakDiskUsg.AvailSize/1024/1024)) } +func (task *BackupTask) getRedisShardVal() { + var enabled bool + var masterNode *myredis.ClusterNodeData + enabled, task.Err = task.Cli.IsClusterEnabled() + if task.Err != nil { + return + } + if enabled { + masterNode, task.Err = task.Cli.RedisClusterGetMasterNode(task.Addr()) + if task.Err != nil { + return + } + task.ShardValue = masterNode.SlotSrcStr + return + } + if util.FileExists(consts.BkDbmonConfFile) { + confData, err := os.ReadFile(consts.BkDbmonConfFile) + if err != nil { + err = fmt.Errorf("read file(%s) fail,err:%v", consts.BkDbmonConfFile, err) + mylog.Logger.Warn(err.Error()) + return + } + if !strings.Contains(string(confData), "server_shards:") { + return + } + type servers struct { + Servers []struct { + ServerShards map[string]string `yaml:"server_shards"` + } `yaml:"servers"` + } + var serversObj servers + err = yaml.Unmarshal(confData, &serversObj) + if err != nil { + err = fmt.Errorf("yaml.Unmarshal fail,err:%v", err) + mylog.Logger.Warn(err.Error()) + return + } + for _, server := range serversObj.Servers { + for ipPort, shardVal := range server.ServerShards { + if ipPort == task.Addr() { + task.ShardValue = shardVal + return + } + } + } + } +} + // RedisInstanceBackup redis(cache)实例备份 func (task *BackupTask) RedisInstanceBackup() { var srcFile string @@ -625,7 +679,7 @@ func (task *BackupTask) TendisSSDInstanceBackup() { mylog.Logger.Error(task.Err.Error()) return } - task.BackupFile = filepath.Join(task.BackupDir, tarFile) + task.BackupFile = tarFile task.GetBakFilesSize() if task.Err != nil { return diff --git a/dbm-services/redis/db-tools/dbactuator/tests/clustertest/twemproxy_swtich.go b/dbm-services/redis/db-tools/dbactuator/tests/clustertest/twemproxy_swtich.go index d7ba7bb1da..b56ad50769 100644 --- a/dbm-services/redis/db-tools/dbactuator/tests/clustertest/twemproxy_swtich.go +++ b/dbm-services/redis/db-tools/dbactuator/tests/clustertest/twemproxy_swtich.go @@ -68,8 +68,10 @@ func TwemproxyCacheSwitch(serverIP, SetBkDbmonPkg(bkdbmonPkgName, bkdbmonPkgMd5). SetDbtoolsPkg(dbtoolsPkgName, dbtoolsPkgMd5). SetBackupConf(). - AppendMasterServer(serverIP, consts.TestRedisMasterStartPort, consts.TestRedisInstanceNum). - AppendMasterServer(serverIP, consts.TestSyncRedisMasterStartPort, consts.TestRedisInstanceNum) + AppendMasterServer(serverIP, consts.TestRedisMasterStartPort, consts.TestRedisInstanceNum, + consts.TendisTypeTwemproxyRedisInstance). + AppendMasterServer(serverIP, consts.TestSyncRedisMasterStartPort, consts.TestRedisInstanceNum, + consts.TendisTypeTwemproxyRedisInstance) if installTest.Err != nil { return installTest.Err } @@ -172,8 +174,10 @@ func TwemproxyCacheSwitchRestoreEnv(serverIP, SetBkDbmonPkg(bkdbmonPkgName, bkdbmonPkgMd5). SetDbtoolsPkg(dbtoolsPkgName, dbtoolsPkgMd5). SetBackupConf(). - AppendMasterServer(serverIP, consts.TestRedisMasterStartPort, consts.TestRedisInstanceNum). - AppendMasterServer(serverIP, consts.TestSyncRedisMasterStartPort, consts.TestRedisInstanceNum) + AppendMasterServer(serverIP, consts.TestRedisMasterStartPort, consts.TestRedisInstanceNum, + consts.TendisTypeTwemproxyRedisInstance). + AppendMasterServer(serverIP, consts.TestSyncRedisMasterStartPort, consts.TestRedisInstanceNum, + consts.TendisTypeTwemproxyRedisInstance) if installTest.Err != nil { return installTest.Err } diff --git a/dbm-services/redis/db-tools/dbactuator/tests/redistest/bkdbmon_install.go b/dbm-services/redis/db-tools/dbactuator/tests/redistest/bkdbmon_install.go index 9cea01261d..c87cf4c73c 100644 --- a/dbm-services/redis/db-tools/dbactuator/tests/redistest/bkdbmon_install.go +++ b/dbm-services/redis/db-tools/dbactuator/tests/redistest/bkdbmon_install.go @@ -94,8 +94,35 @@ func (test *BkDBmonInstallTest) SetBackupConf() *BkDBmonInstallTest { return test } +func getRedisClusterServerShargs(ip string, startPort, instNum int, dbType string) (shards map[string]string) { + if !consts.IsTwemproxyClusterType(dbType) { + return + } + if startPort == 0 { + startPort = consts.TestRedisMasterStartPort + } + if instNum == 0 { + instNum = 4 + } + var port int + var instStr string + var segStart int + var segEnd int + shards = make(map[string]string) + segStep := (consts.TwemproxyMaxSegment + 1) / instNum + for i := 0; i < instNum; i++ { + segStart = i * segStep + segEnd = (i+1)*segStep - 1 + port = startPort + i + instStr = fmt.Sprintf("%s:%d", ip, port) + shards[instStr] = fmt.Sprintf("%d-%d", segStart, segEnd) + } + return +} + // AppendMasterServer append master server -func (test *BkDBmonInstallTest) AppendMasterServer(masterIP string, startPort, instNum int) *BkDBmonInstallTest { +func (test *BkDBmonInstallTest) AppendMasterServer(masterIP string, startPort, instNum int, + dbType string) *BkDBmonInstallTest { if test.Err != nil { return test } @@ -112,10 +139,15 @@ func (test *BkDBmonInstallTest) AppendMasterServer(masterIP string, startPort, i svrItem := atomredis.ConfServerItem{ BkBizID: "200500194", BkCloudID: 246, + App: "testapp", + AppName: "测试app", ClusterDomain: "tendisx.aaaa.testapp.db", + ClusterName: "aaaa", + ClusterType: dbType, MetaRole: consts.MetaRoleRedisMaster, ServerIP: masterIP, ServerPorts: ports, + ServerShards: getRedisClusterServerShargs(masterIP, startPort, instNum, dbType), } test.Servers = append(test.Servers, svrItem) return test @@ -139,7 +171,8 @@ func (test *BkDBmonInstallTest) OnlyAEmptyServer(ip string) *BkDBmonInstallTest } // AppendSlaveServer append slave server -func (test *BkDBmonInstallTest) AppendSlaveServer(slaveIP string, startPort, instNum int) *BkDBmonInstallTest { +func (test *BkDBmonInstallTest) AppendSlaveServer(slaveIP string, startPort, instNum int, + dbType string) *BkDBmonInstallTest { if test.Err != nil { return test } @@ -160,10 +193,11 @@ func (test *BkDBmonInstallTest) AppendSlaveServer(slaveIP string, startPort, ins AppName: "测试app", ClusterDomain: "tendisx.aaaa.testapp.db", ClusterName: "aaaa", - ClusterType: consts.TendisTypePredixyTendisplusCluster, + ClusterType: dbType, MetaRole: consts.MetaRoleRedisSlave, ServerIP: slaveIP, ServerPorts: ports, + ServerShards: getRedisClusterServerShargs(slaveIP, startPort, instNum, dbType), } test.Servers = append(test.Servers, svrItem) return test @@ -236,8 +270,8 @@ func BkDbmonInstall(serverIP, dbtoolsPkgName, dbtoolsPkgMd5, bkdbmonPkgName, bkd SetBkDbmonPkg(bkdbmonPkgName, bkdbmonPkgMd5). SetDbtoolsPkg(dbtoolsPkgName, dbtoolsPkgMd5). SetBackupConf(). - AppendMasterServer(serverIP, masterStartPort, consts.TestRedisInstanceNum). - AppendSlaveServer(serverIP, slaveStartPort, consts.TestRedisInstanceNum) + AppendMasterServer(serverIP, masterStartPort, consts.TestRedisInstanceNum, dbType). + AppendSlaveServer(serverIP, slaveStartPort, consts.TestRedisInstanceNum, dbType) if bkdbmonTest.Err != nil { return } diff --git a/dbm-services/redis/db-tools/dbactuator/tests/redistest/redis_install.go b/dbm-services/redis/db-tools/dbactuator/tests/redistest/redis_install.go index b155f1118e..411efbdb33 100644 --- a/dbm-services/redis/db-tools/dbactuator/tests/redistest/redis_install.go +++ b/dbm-services/redis/db-tools/dbactuator/tests/redistest/redis_install.go @@ -404,9 +404,6 @@ func (test *RedisInstallTest) SetTendisplusRedisConf() { "scanCntIndexMgr": "10000", "truncateBinlogIntervalMs": "100", "minbinlogkeepsec": "1800", - "binlogdelrange": "500000", - "migrate-gc-enabled": "false", - "deletefilesinrange-for-binlog": "1", "incrpushthreadnum": "10", "rename-command": `config confxx rename-command flushdb cleandb diff --git a/dbm-services/redis/db-tools/dbactuator/tests/test.sh b/dbm-services/redis/db-tools/dbactuator/tests/test.sh index 21ca002225..437b87b5e3 100644 --- a/dbm-services/redis/db-tools/dbactuator/tests/test.sh +++ b/dbm-services/redis/db-tools/dbactuator/tests/test.sh @@ -3,8 +3,8 @@ repoUser="" repoPassword="" -tendisplusPkgName="tendisplus-2.5.0-rocksdb-v6.23.3.tgz" -tendisplusPkgMd5="573fac8917f3cb6d73d4913471a6eacc" +tendisplusPkgName="tendisplus-2.6.0-rocksdb-v6.23.3.tgz" +tendisplusPkgMd5="eaf90d7072740fd232b157d9cb32a425" redisPkgName="redis-6.2.7.tar.gz" redisPkgMd5="1fc9e5c3a044ce523844a6f2717e5ac3" @@ -17,14 +17,14 @@ tendisssdPkgMd5="7bfe87efbe017c689c3f4a11bb2a8be9" predixyPkgName="predixy-1.4.0.tar.gz" predixyPkgMd5="24aba4a96dcf7f8581d2fde89d062455" -twemproxyPkgName="twemproxy-0.4.1-v23.tar.gz" -twemproxyPkgMd5="41850e44bebfce84ebd4d0cf4cce6833" +twemproxyPkgName="twemproxy-0.4.1-v27.tar.gz" +twemproxyPkgMd5="b7fcec49a43da9fdb5acde0a42287d43" dbtoolsPkgName="dbtools.tar.gz" dbtoolsPkgMd5="ced0fa280c63cb31536fefc1845f3ff0" -bkdbmonPkgName="bk-dbmon-v0.9.tar.gz" -bkdbmonPkgMd5="a579e2ffd74259f3dd66d23a10a170ba" +bkdbmonPkgName="bk-dbmon-v0.12.tar.gz" +bkdbmonPkgMd5="2a3a51c3b4a7dce4300e894e19f2f0ea" repoUrl="" @@ -144,7 +144,7 @@ if [[ -e $localBkDbmonPkgName ]]; then localBkDbmonPkgMd5=$(md5sum $localBkDbmonPkgName | awk '{print $1}') fi -wgetCmd="wget --user=$repoUser --password=$repoPassword $repoUrl/tendisplus/Tendisplus-2.5/$tendisplusPkgName -O $localTendisplusPkgName" +wgetCmd="wget --user=$repoUser --password=$repoPassword $repoUrl/tendisplus/Tendisplus-2.6/$tendisplusPkgName -O $localTendisplusPkgName" if [[ ! -e $localTendisplusPkgName ]]; then echo $wgetCmd $wgetCmd diff --git a/dbm-services/redis/db-tools/dbmon/cmd/root.go b/dbm-services/redis/db-tools/dbmon/cmd/root.go index ff4854fba5..c7b84a524f 100644 --- a/dbm-services/redis/db-tools/dbmon/cmd/root.go +++ b/dbm-services/redis/db-tools/dbmon/cmd/root.go @@ -78,6 +78,8 @@ var rootCmd = &cobra.Command{ log.Panicf("reportHistoryClear addjob fail,entryID:%d,err:%v\n", entryID, err) return } + mylog.Logger.Info(fmt.Sprintf("create cron GlobHistoryClearJob success,entryID:%d", entryID)) + if config.GlobalConf.RedisFullBackup.Cron != "" { entryID, err = c.AddJob(config.GlobalConf.RedisFullBackup.Cron, cron.NewChain(cron.SkipIfStillRunning(mylog.AdapterLog)).Then(redisfullbackup.GlobRedisFullBackupJob)) @@ -85,6 +87,7 @@ var rootCmd = &cobra.Command{ log.Panicf("fullbackup addjob fail,entryID:%d,err:%v\n", entryID, err) return } + mylog.Logger.Info(fmt.Sprintf("create cron GlobRedisFullBackupJob success,entryID:%d", entryID)) } if config.GlobalConf.RedisBinlogBackup.Cron != "" { entryID, err = c.AddJob(config.GlobalConf.RedisBinlogBackup.Cron, @@ -93,12 +96,15 @@ var rootCmd = &cobra.Command{ log.Panicf("binlogbackup addjob fail,entryID:%d,err:%v\n", entryID, err) return } + mylog.Logger.Info(fmt.Sprintf("create cron GlobRedisBinlogBakJob success,entryID:%d", entryID)) + entryID, err = c.AddJob(config.GlobalConf.RedisBinlogBackup.Cron, cron.NewChain(cron.SkipIfStillRunning(mylog.AdapterLog)).Then(redisfullbackup.GlobRedisFullCheckJob)) if err != nil { log.Panicf("fullcheck addjob fail,entryID:%d,err:%v\n", entryID, err) return } + mylog.Logger.Info(fmt.Sprintf("create cron GlobRedisFullCheckJob success,entryID:%d", entryID)) } if config.GlobalConf.RedisHeartbeat.Cron != "" { entryID, err = c.AddJob(config.GlobalConf.RedisHeartbeat.Cron, @@ -107,6 +113,7 @@ var rootCmd = &cobra.Command{ fmt.Printf("heartbeat addjob fail,entryID:%d,err:%v\n", entryID, err) return } + mylog.Logger.Info(fmt.Sprintf("create cron GlobRedisHeartbeatJob success,entryID:%d", entryID)) } if config.GlobalConf.RedisMonitor.Cron != "" { entryID, err = c.AddJob(config.GlobalConf.RedisMonitor.Cron, @@ -115,6 +122,7 @@ var rootCmd = &cobra.Command{ fmt.Printf("monitor addjob fail,entryID:%d,err:%v\n", entryID, err) return } + mylog.Logger.Info(fmt.Sprintf("create cron GlobRedisMonitorJob success,entryID:%d", entryID)) } if config.GlobalConf.KeyLifeCycle.Cron != "" { entryID, err = c.AddJob(config.GlobalConf.KeyLifeCycle.Cron, @@ -123,6 +131,7 @@ var rootCmd = &cobra.Command{ fmt.Printf("keylifecycle addjob fail,entryID:%d,err:%v\n", entryID, err) return } + mylog.Logger.Info(fmt.Sprintf("create cron GlobRedisKeyLifeCycleJob success,entryID:%d", entryID)) } } else if hasMongo { diff --git a/dbm-services/redis/db-tools/dbmon/config/config.go b/dbm-services/redis/db-tools/dbmon/config/config.go index ba9853b52c..ab987f887c 100644 --- a/dbm-services/redis/db-tools/dbmon/config/config.go +++ b/dbm-services/redis/db-tools/dbmon/config/config.go @@ -12,17 +12,19 @@ import ( // ConfServerItem servers配置项 type ConfServerItem struct { - BkBizID string `json:"bk_biz_id" mapstructure:"bk_biz_id"` - BkCloudID int64 `json:"bk_cloud_id" mapstructure:"bk_cloud_id"` - App string `json:"app" mapstructure:"app"` - AppName string `json:"app_name" mapstructure:"app_name"` - ClusterDomain string `json:"cluster_domain" mapstructure:"cluster_domain"` - ClusterName string `json:"cluster_name" mapstructure:"cluster_name"` - ClusterType string `json:"cluster_type" mapstructure:"cluster_type"` - MetaRole string `json:"meta_role" mapstructure:"meta_role"` - ServerIP string `json:"server_ip" mapstructure:"server_ip"` - ServerPorts []int `json:"server_ports" mapstructure:"server_ports"` - Shard string `json:"shard" mapstructure:"shard"` + BkBizID string `json:"bk_biz_id" mapstructure:"bk_biz_id"` + BkCloudID int64 `json:"bk_cloud_id" mapstructure:"bk_cloud_id"` + App string `json:"app" mapstructure:"app"` + AppName string `json:"app_name" mapstructure:"app_name"` + ClusterDomain string `json:"cluster_domain" mapstructure:"cluster_domain"` + ClusterName string `json:"cluster_name" mapstructure:"cluster_name"` + ClusterType string `json:"cluster_type" mapstructure:"cluster_type"` + MetaRole string `json:"meta_role" mapstructure:"meta_role"` + ServerIP string `json:"server_ip" mapstructure:"server_ip"` + ServerPorts []int `json:"server_ports" mapstructure:"server_ports"` + ServerShards map[string]string `json:"server_shards" mapstructure:"server_shards"` + CacheBackupMode string `json:"cache_backup_mode" mapstructure:"cache_backup_mode"` // aof or rdb + Shard string `json:"shard" mapstructure:"shard"` } // ConfRedisFullBackup 全备配置 diff --git a/dbm-services/redis/db-tools/dbmon/models/myredis/client.go b/dbm-services/redis/db-tools/dbmon/models/myredis/client.go index 583d367416..adcac28b14 100644 --- a/dbm-services/redis/db-tools/dbmon/models/myredis/client.go +++ b/dbm-services/redis/db-tools/dbmon/models/myredis/client.go @@ -1328,6 +1328,36 @@ func (db *RedisClient) IsTendisSSDReplicaStatusOk(masterIP, masterPort string) ( return } +// RedisClusterGetMasterNode 获取master节点信息(如果 addr是master则返回它的node信息,否则找到它的masterID,进而找到master的node信息) +func (db *RedisClient) RedisClusterGetMasterNode(addr string) (masterNode *ClusterNodeData, err error) { + addrToNodes, err := db.GetAddrMapToNodes() + if err != nil { + return + } + myNode, ok := addrToNodes[addr] + if !ok { + err = fmt.Errorf("addr:%s not found in cluster nodes", addr) + mylog.Logger.Error(err.Error()) + return + } + if myNode.GetRole() == consts.RedisMasterRole { + masterNode = myNode + return + } + masterNodeID := myNode.MasterID + idToNode, err := db.GetNodeIDMapToNodes() + if err != nil { + return + } + masterNode, ok = idToNode[masterNodeID] + if !ok { + err = fmt.Errorf("masterNodeID:%s not found in cluster nodes", masterNodeID) + mylog.Logger.Error(err.Error()) + return + } + return +} + // MaxMemory 'confxx get maxmemory' func (db *RedisClient) MaxMemory() (maxmemory uint64, err error) { var confRet map[string]string diff --git a/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go b/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go index afddb1b24a..ea4d840b53 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go @@ -162,6 +162,9 @@ const ( BackupStatusToBakSysSuccess = "to_backup_system_success" BackupStatusFailed = "failed" BackupStatusLocalSuccess = "local_success" + + CacheBackupModeAof = "aof" + CacheBackupModeRdb = "rdb" ) const ( diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/job.go b/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/job.go index ba9ed7d044..314bff5771 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/job.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/job.go @@ -64,17 +64,6 @@ func (job *Job) Run() { // if job.Err != nil { // return // } - - // 检查历史备份任务状态 并 删除过旧的本地文件 - for _, svrItem := range job.Conf.Servers { - if !consts.IsRedisMetaRole(svrItem.MetaRole) { - continue - } - for _, port := range svrItem.ServerPorts { - job.CheckOldBinlogBackupStatus(port) - job.DeleteTooOldBinlogbackup(port) - } - } job.createTasks() if job.Err != nil { return @@ -88,6 +77,17 @@ func (job *Job) Run() { continue } } + + // 检查历史备份任务状态 并 删除过旧的本地文件 + for _, svrItem := range job.Conf.Servers { + if !consts.IsRedisMetaRole(svrItem.MetaRole) { + continue + } + for _, port := range svrItem.ServerPorts { + job.CheckOldBinlogBackupStatus(port) + job.DeleteTooOldBinlogbackup(port) + } + } } // GetRealBackupDir 获取本地binlog保存路径 @@ -110,6 +110,7 @@ func (job *Job) createTasks() { var task *Task var password string var taskBackupDir string + var instStr string job.Tasks = []*Task{} for _, svrItem := range job.Conf.Servers { @@ -121,13 +122,15 @@ func (job *Job) createTasks() { if job.Err != nil { return } + instStr = fmt.Sprintf("%s:%d", svrItem.ServerIP, port) taskBackupDir = filepath.Join(job.RealBackupDir, "binlog", strconv.Itoa(port)) util.MkDirsIfNotExists([]string{taskBackupDir}) util.LocalDirChownMysql(taskBackupDir) task = NewBinlogBackupTask(svrItem.BkBizID, svrItem.BkCloudID, svrItem.ClusterDomain, svrItem.ServerIP, port, password, job.Conf.RedisBinlogBackup.ToBackupSystem, - taskBackupDir, job.Conf.RedisBinlogBackup.OldFileLeftDay, job.Reporter) + taskBackupDir, svrItem.ServerShards[instStr], + job.Conf.RedisBinlogBackup.OldFileLeftDay, job.Reporter) job.Tasks = append(job.Tasks, task) } } diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/task.go b/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/task.go index 99bb586f67..936c90cc92 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/task.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/redisbinlogbackup/task.go @@ -50,8 +50,9 @@ type Task struct { StartTime customtime.CustomTime `json:"start_time"` // binlog文件生成时间(非压缩) EndTime customtime.CustomTime `json:"end_time"` // binlog文件最后修改时间(非压缩) BackupTaskID string `json:"backup_taskid"` - BackupMD5 string `json:"backup_md5"` // 目前为空 - BackupTag string `json:"backup_tag"` // REDIS_BINLOG + BackupMD5 string `json:"backup_md5"` // 目前为空 + BackupTag string `json:"backup_tag"` // REDIS_BINLOG + ShardValue string `json:"shard_value"` // shard值 Status string `json:"status"` Message string `json:"message"` Cli *myredis.RedisClient `json:"-"` @@ -63,7 +64,7 @@ type Task struct { // NewBinlogBackupTask new binlog backup task func NewBinlogBackupTask(bkBizID string, bkCloudID int64, domain, ip string, port int, - password, toBackupSys, backupDir string, oldFileLeftDay int, + password, toBackupSys, backupDir, shardValue string, oldFileLeftDay int, reporter report.Reporter) *Task { ret := &Task{ @@ -79,6 +80,7 @@ func NewBinlogBackupTask(bkBizID string, bkCloudID int64, domain, ip string, por BackupDir: backupDir, BackupTag: consts.RedisBinlogTAG, reporter: reporter, + ShardValue: shardValue, } ret.backupClient = backupsys.NewIBSBackupClient(consts.IBSBackupClient, consts.RedisBinlogTAG) // ret.backupClient, ret.Err = backupsys.NewCosBackupClient(consts.COSBackupClient, "", consts.RedisBinlogTAG) @@ -112,6 +114,11 @@ func (task *Task) BackupLocalBinlogs() { return } + task.reGetShardValWhenClusterEnabled() + if task.Err != nil { + return + } + // 获取文件锁 lockFile := fmt.Sprintf("lock.%s.%d", task.ServerIP, task.ServerPort) lockFile = filepath.Join(task.BackupDir, lockFile) @@ -191,6 +198,23 @@ func (task *Task) newConnect() { return } +func (task *Task) reGetShardValWhenClusterEnabled() { + var enabled bool + var masterNode *myredis.ClusterNodeData + enabled, task.Err = task.Cli.IsClusterEnabled() + if task.Err != nil { + return + } + if !enabled { + return + } + masterNode, task.Err = task.Cli.RedisClusterGetMasterNode(task.Addr()) + if task.Err != nil { + return + } + task.ShardValue = masterNode.SlotSrcStr +} + type tendisBinlogItem struct { File string `json:"file"` // full path KvStoreIdx int `json:"kvstoreidx"` diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/backupjob.go b/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/backupjob.go index 5f931ee3c0..6667a5163d 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/backupjob.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/backupjob.go @@ -66,17 +66,6 @@ func (job *Job) Run() { // if job.Err != nil { // return // } - - // 检查历史备份任务状态 并 删除过旧的本地文件 - for _, svrItem := range job.Conf.Servers { - if !consts.IsRedisMetaRole(svrItem.MetaRole) { - continue - } - for _, port := range svrItem.ServerPorts { - job.CheckOldFullbackupStatus(port) - job.DeleteTooOldFullbackup(port) - } - } job.createTasks() if job.Err != nil { return @@ -90,6 +79,17 @@ func (job *Job) Run() { continue } } + + // 检查历史备份任务状态 并 删除过旧的本地文件 + for _, svrItem := range job.Conf.Servers { + if !consts.IsRedisMetaRole(svrItem.MetaRole) { + continue + } + for _, port := range svrItem.ServerPorts { + job.CheckOldFullbackupStatus(port) + job.DeleteTooOldFullbackup(port) + } + } } // GetRealBackupDir 获取本地全备保存路径 @@ -115,9 +115,11 @@ func (job *Job) GetReporter() { func (job *Job) createTasks() { var task *BackupTask var password string + var instStr string mylog.Logger.Info(fmt.Sprintf("start create fullback tasks,Servers:%s", util.ToString(job.Conf.Servers))) job.Tasks = []*BackupTask{} + for _, svrItem := range job.Conf.Servers { if !consts.IsRedisMetaRole(svrItem.MetaRole) { continue @@ -127,11 +129,13 @@ func (job *Job) createTasks() { if job.Err != nil { return } + instStr = fmt.Sprintf("%s:%d", svrItem.ServerIP, port) task = NewFullBackupTask(svrItem.BkBizID, svrItem.BkCloudID, svrItem.ClusterDomain, svrItem.ServerIP, port, password, - job.Conf.RedisFullBackup.ToBackupSystem, consts.NormalBackupType, job.RealBackupDir, + job.Conf.RedisFullBackup.ToBackupSystem, + consts.NormalBackupType, svrItem.CacheBackupMode, job.RealBackupDir, job.Conf.RedisFullBackup.TarSplit, job.Conf.RedisFullBackup.TarSplitPartSize, - job.Reporter) + svrItem.ServerShards[instStr], job.Reporter) job.Tasks = append(job.Tasks, task) } } diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/task.go b/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/task.go index 1c03ff2fc7..51c84b39a3 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/task.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/redisfullbackup/task.go @@ -39,6 +39,7 @@ type BackupTask struct { ToBackupSystem string `json:"-"` DbType string `json:"db_type"` // RedisInstance or TendisplusInstance or TendisSSDInstance BackupType string `json:"-"` // 常规备份、下线备份 + CacheBackupMode string `json:"-"` // aof or rdb RealRole string `json:"role"` DataSize uint64 `json:"-"` // redis实例数据大小 DataDir string `json:"-"` @@ -48,10 +49,11 @@ type BackupTask struct { BackupFile string `json:"backup_file"` // 备份的目标文件,如果文件过大会切割成多个 BackupFileSize int64 `json:"backup_file_size"` // 备份文件大小(已切割 or 已压缩 or 已打包) BackupTaskID string `json:"backup_taskid"` - BackupMD5 string `json:"backup_md5"` // 目前为空 - BackupTag string `json:"backup_tag"` // REDIS_FULL or REDIS_BINLOG - StartTime customtime.CustomTime `json:"start_time"` // 生成全备的起始时间 - EndTime customtime.CustomTime `json:"end_time"` // //生成全备的结束时间 + BackupMD5 string `json:"backup_md5"` // 目前为空 + BackupTag string `json:"backup_tag"` // REDIS_FULL + ShardValue string `json:"shard_value"` // shard值 + StartTime customtime.CustomTime `json:"start_time"` // 生成全备的起始时间 + EndTime customtime.CustomTime `json:"end_time"` // //生成全备的结束时间 Status string `json:"status"` Message string `json:"message"` Cli *myredis.RedisClient `json:"-"` @@ -63,7 +65,7 @@ type BackupTask struct { // NewFullBackupTask new backup task func NewFullBackupTask(bkBizID string, bkCloudID int64, domain, ip string, port int, password, - toBackupSys, backupType, backupDir string, tarSplit bool, tarSplitSize string, + toBackupSys, backupType, cacheBackupMode, backupDir string, tarSplit bool, tarSplitSize, shardValue string, reporter report.Reporter) *BackupTask { ret := &BackupTask{ ReportType: consts.RedisFullBackupReportType, @@ -75,12 +77,14 @@ func NewFullBackupTask(bkBizID string, bkCloudID int64, domain, ip string, port Password: password, ToBackupSystem: toBackupSys, BackupType: backupType, + CacheBackupMode: cacheBackupMode, BackupDir: backupDir, TarSplit: tarSplit, TarSplitPartSize: tarSplitSize, BackupTaskID: "", BackupMD5: "", BackupTag: consts.RedisFullBackupTAG, + ShardValue: shardValue, reporter: reporter, } ret.backupClient = backupsys.NewIBSBackupClient(consts.IBSBackupClient, consts.RedisFullBackupTAG) @@ -104,7 +108,6 @@ func (task *BackupTask) ToString() string { // BakcupToLocal 执行备份task,备份到本地 func (task *BackupTask) BakcupToLocal() { - var infoRet map[string]string var connSlaves int var locked bool task.newConnect() @@ -113,13 +116,14 @@ func (task *BackupTask) BakcupToLocal() { } defer task.Cli.Close() - infoRet, task.Err = task.Cli.Info("replication") - if task.Err != nil { - return - } - connSlaves, _ = strconv.Atoi(infoRet["connectedSlaves"]) + connSlaves, task.Err = task.Cli.ConnectedSlaves() // 如果是redis_master且对应的slave大于0,则跳过备份 if task.RealRole == consts.RedisMasterRole && connSlaves > 0 { + mylog.Logger.Info(fmt.Sprintf("redis(%s) is master and has slaves,skip backup", task.Addr())) + return + } + task.reGetShardValWhenClusterEnabled() + if task.Err != nil { return } @@ -271,6 +275,23 @@ func (task *BackupTask) PrecheckDisk() { task.Addr(), task.DataSize/1024/1024, task.BackupDir, bakDiskUsg.AvailSize/1024/1024)) } +func (task *BackupTask) reGetShardValWhenClusterEnabled() { + var enabled bool + var masterNode *myredis.ClusterNodeData + enabled, task.Err = task.Cli.IsClusterEnabled() + if task.Err != nil { + return + } + if !enabled { + return + } + masterNode, task.Err = task.Cli.RedisClusterGetMasterNode(task.Addr()) + if task.Err != nil { + return + } + task.ShardValue = masterNode.SlotSrcStr +} + // RedisInstanceBackup redis(cache)实例备份 func (task *BackupTask) RedisInstanceBackup() { var srcFile string @@ -279,7 +300,8 @@ func (task *BackupTask) RedisInstanceBackup() { var fileSize int64 nowtime := time.Now().Local().Format(consts.FilenameTimeLayout) task.StartTime.Time = time.Now().Local() - if task.RealRole == consts.RedisMasterRole { + if task.RealRole == consts.RedisMasterRole || + task.CacheBackupMode == consts.CacheBackupModeRdb { // redis master backup rdb confMap, task.Err = task.Cli.ConfigGet("dbfilename") if task.Err != nil { @@ -426,7 +448,7 @@ func (task *BackupTask) TendisSSDInstanceBackup() { mylog.Logger.Error(task.Err.Error()) return } - task.BackupFile = filepath.Join(task.BackupDir, tarFile) + task.BackupFile = tarFile task.GetBakFilesSize() if task.Err != nil { return diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redismonitor/redis_task.go b/dbm-services/redis/db-tools/dbmon/pkg/redismonitor/redis_task.go index 821de3bff8..7fbb600dee 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/redismonitor/redis_task.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/redismonitor/redis_task.go @@ -304,6 +304,17 @@ func (task *RedisMonitorTask) CheckPersist() { continue } appendonly, _ = confmap["appendonly"] + if task.ServerConf.CacheBackupMode == consts.CacheBackupModeRdb { + if strings.ToLower(appendonly) == "yes" { + // 如果集群是rdb备份,但是aof开启,则关闭aof + _, task.Err = cliItem.ConfigSet("appendonly", "yes") + if task.Err != nil { + continue + } + cliItem.ConfigRewrite() + } + return + } if strings.ToLower(appendonly) == "no" { msg = fmt.Sprintf("redis_slave(%s) appendonly==%s", cliItem.Addr, appendonly) mylog.Logger.Warn(msg) diff --git a/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/backupFileFetch.go b/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/backupFileFetch.go index 829eee37a0..d5633a03b1 100644 --- a/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/backupFileFetch.go +++ b/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/backupFileFetch.go @@ -3,10 +3,11 @@ package tendisssd import ( "path/filepath" + "dbm-services/redis/db-tools/dbactuator/pkg/util" "dbm-services/redis/redis-dts/models/mysql/tendisdb" "dbm-services/redis/redis-dts/pkg/constvar" "dbm-services/redis/redis-dts/pkg/dtsTask" - "dbm-services/redis/redis-dts/pkg/remoteOperation" + "dbm-services/redis/redis-dts/pkg/scrdbclient" ) // BakcupFileFetchTask 备份拉取task @@ -65,18 +66,49 @@ func (task *BakcupFileFetchTask) Execute() { } // 从srcIP上拉取备份文件 - var absCli remoteOperation.RemoteOperation - absCli, task.Err = remoteOperation.NewIAbsClientByEnvVars(task.RowData.SrcIP, task.Logger) + // var absCli remoteOperation.RemoteOperation + // absCli, task.Err = remoteOperation.NewIAbsClientByEnvVars(task.RowData.SrcIP, task.Logger) + // if task.Err != nil { + // return + // } + // task.Err = absCli.RemoteDownload( + // filepath.Dir(task.RowData.TendisbackupFile), + // task.TaskDir, + // filepath.Base(task.RowData.TendisbackupFile), + // constvar.GetABSPullBwLimit(), + // ) + // if task.Err != nil { + // return + // } + var localIP string + localIP, task.Err = util.GetLocalIP() if task.Err != nil { return } - task.Err = absCli.RemoteDownload( - filepath.Dir(task.RowData.TendisbackupFile), - task.TaskDir, - filepath.Base(task.RowData.TendisbackupFile), - constvar.GetABSPullBwLimit(), - ) - if task.Err != nil { + cli, err := scrdbclient.NewClient(constvar.BkDbm, task.Logger) + if err != nil { + task.Err = err + return + } + param := scrdbclient.TransferFileReq{} + param.SourceList = append(param.SourceList, scrdbclient.TransferFileSourceItem{ + BkCloudID: int(task.RowData.BkCloudID), + IP: task.RowData.SrcIP, + Account: "mysql", + FileList: []string{ + task.RowData.TendisbackupFile + string(filepath.Separator), + }, + }) + param.TargetAccount = "mysql" + param.TargetDir = task.TaskDir + param.TargetIPList = append(param.TargetIPList, scrdbclient.IPItem{ + BkCloudID: int(task.RowData.BkCloudID), + IP: localIP, + }) + param.Timeout = 2 * 86400 + err = cli.SendNew(param, 5) + if err != nil { + task.Err = err return } diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_install.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_install.py index df7721ea8c..c676c53f3c 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_install.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_install.py @@ -144,6 +144,8 @@ def RedisBatchInstallAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, param: "cluster_name": act_kwargs.cluster["cluster_name"], "cluster_type": act_kwargs.cluster["cluster_type"], "cluster_domain": act_kwargs.cluster["immute_domain"], + "server_shards": param.get("server_shards", {}), + "cache_backup_mode": param.get("cache_backup_mode", ""), } ] act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_makesync.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_makesync.py index 623aef17d8..507b93bdfe 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_makesync.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/atom_jobs/redis_makesync.py @@ -70,6 +70,7 @@ def RedisMakeSyncAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, params: Di "bk_biz_id": str(act_kwargs.cluster["bk_biz_id"]), "bk_cloud_id": int(act_kwargs.cluster["bk_cloud_id"]), "server_ports": [], + "server_shards": {}, "cluster_domain": act_kwargs.cluster["immute_domain"], } ] @@ -108,6 +109,8 @@ def RedisMakeSyncAtomJob(root_id, ticket_data, sub_kwargs: ActKwargs, params: Di "meta_role": InstanceRole.REDIS_SLAVE.value, # 可能是master/slave 角色 "server_ip": params["sync_dst1"], "server_ports": server_ports, + "server_shards": params.get("server_shards", {}), + "cache_backup_mode": params.get("cache_backup_mode", ""), "cluster_name": act_kwargs.cluster["cluster_name"], "cluster_type": act_kwargs.cluster["cluster_type"], "cluster_domain": act_kwargs.cluster["immute_domain"], diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py index a8b8c4f24a..77cdeeffbd 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py @@ -30,6 +30,7 @@ from backend.flow.plugins.components.collections.redis.redis_db_meta import RedisDBMetaComponent from backend.flow.utils.redis.redis_context_dataclass import ActKwargs, CommonContext from backend.flow.utils.redis.redis_db_meta import RedisDBMeta +from backend.flow.utils.redis.redis_proxy_util import get_cache_backup_mode, get_twemproxy_cluster_server_shards logger = logging.getLogger("flow") @@ -121,6 +122,19 @@ def add_slave_flow(self): cluster_kwargs.cluster.update(cluster_info) cluster_kwargs.cluster["created_by"] = self.data["created_by"] + newslave_to_master = {} + for host_pair in input_item["pairs"]: + master_ip = host_pair["redis_master"]["ip"] + for new_slave_item in host_pair["redis_slave"]: + for port in cluster_info["master_ports"][master_ip]: + newslave_to_master[ + "{}{}{}".format(new_slave_item["ip"], IP_PORT_DIVIDER, port) + ] = "{}{}{}".format(master_ip, IP_PORT_DIVIDER, port) + + twemproxy_server_shards = get_twemproxy_cluster_server_shards( + bk_biz_id, input_item["cluster_id"], newslave_to_master + ) + sub_pipeline.add_act( act_name=_("初始化配置-{}".format(cluster_info["immute_domain"])), act_component_code=GetRedisActPayloadComponent.code, @@ -142,6 +156,8 @@ def add_slave_flow(self): "instance_numb": len(cluster_info["master_ports"][master_ip]), "spec_id": input_item["resource_spec"][master_ip].get("id", 0), "spec_config": input_item["resource_spec"][master_ip], + "server_shards": twemproxy_server_shards.get(new_slave_item["ip"], {}), + "cache_backup_mode": get_cache_backup_mode(bk_biz_id, input_item["cluster_id"]), }, ) child_pipelines.append(install_builder) @@ -156,6 +172,8 @@ def add_slave_flow(self): "origin_1": master_ip, "sync_dst1": new_slave_item["ip"], "ins_link": [], + "server_shards": twemproxy_server_shards.get(new_slave_item["ip"], {}), + "cache_backup_mode": get_cache_backup_mode(bk_biz_id, input_item["cluster_id"]), } for port in cluster_info["master_ports"][master_ip]: sync_param["ins_link"].append({"origin_1": str(port), "sync_dst1": str(port)}) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py index b404ad366b..98fb474887 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py @@ -11,6 +11,7 @@ import copy import json import logging.config +from collections import defaultdict from dataclasses import asdict from typing import Dict, Optional @@ -30,6 +31,7 @@ from backend.flow.utils.redis.redis_act_playload import RedisActPayload from backend.flow.utils.redis.redis_context_dataclass import ActKwargs, CommonContext, DnsKwargs from backend.flow.utils.redis.redis_db_meta import RedisDBMeta +from backend.flow.utils.redis.redis_proxy_util import get_cache_backup_mode from backend.flow.utils.redis.redis_util import check_domain logger = logging.getLogger("flow") @@ -87,7 +89,7 @@ def __pre_check(self, proxy_ips, master_ips, slave_ips, group_num, shard_num, se if len(m) != 0: raise Exception("[{}] is used.".format(m)) - def cal_twemproxy_serveres(self, master_ips, shard_num, inst_num, name) -> list: + def cal_twemproxy_serveres(self, master_ips, slave_ips, shard_num, inst_num, name): """ 计算twemproxy的servers 列表 - redisip:redisport:1 app beginSeg-endSeg 1 @@ -98,7 +100,9 @@ def cal_twemproxy_serveres(self, master_ips, shard_num, inst_num, name) -> list: # 计算分片 servers = [] - for _index, ip in enumerate(master_ips): + twemproxy_server_shards = defaultdict(dict) + for _index, master_ip in enumerate(master_ips): + slave_ip = slave_ips[_index] for inst_no in range(0, inst_num): port = DEFAULT_REDIS_START_PORT + inst_no begin_seg = seg_no * seg_num @@ -109,8 +113,12 @@ def cal_twemproxy_serveres(self, master_ips, shard_num, inst_num, name) -> list: if begin_seg >= DEFAULT_TWEMPROXY_SEG_TOTOL_NUM or end_seg >= DEFAULT_TWEMPROXY_SEG_TOTOL_NUM: raise Exception("cal_twemproxy_serveres error. pleace check params") seg_no = seg_no + 1 - servers.append("{}:{} {} {}-{} {}".format(ip, port, name, begin_seg, end_seg, 1)) - return servers + servers.append("{}:{} {} {}-{} {}".format(master_ip, port, name, begin_seg, end_seg, 1)) + master_inst = "{}:{}".format(master_ip, port) + slave_inst = "{}:{}".format(slave_ip, port) + twemproxy_server_shards[master_ip][master_inst] = "{}-{}".format(begin_seg, end_seg) + twemproxy_server_shards[slave_ip][slave_inst] = "{}-{}".format(begin_seg, end_seg) + return servers, twemproxy_server_shards def deploy_redis_cluster_flow(self): """ @@ -128,7 +136,9 @@ def deploy_redis_cluster_flow(self): ins_num = self.data["shard_num"] // self.data["group_num"] ports = list(map(lambda i: i + DEFAULT_REDIS_START_PORT, range(ins_num))) - servers = self.cal_twemproxy_serveres(master_ips, self.data["shard_num"], ins_num, self.data["cluster_name"]) + servers, twemproxy_server_shards = self.cal_twemproxy_serveres( + master_ips, slave_ips, self.data["shard_num"], ins_num, self.data["cluster_name"] + ) self.__pre_check( proxy_ips, @@ -170,6 +180,8 @@ def deploy_redis_cluster_flow(self): params["spec_id"] = int(self.data["resource_spec"]["master"]["id"]) params["spec_config"] = self.data["resource_spec"]["master"] params["meta_role"] = InstanceRole.REDIS_MASTER.value + params["server_shards"] = twemproxy_server_shards[ip] + params["cache_backup_mode"] = get_cache_backup_mode(self.data["bk_biz_id"], 0) sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params) sub_pipelines.append(sub_builder) for ip in slave_ips: @@ -180,6 +192,8 @@ def deploy_redis_cluster_flow(self): params["spec_id"] = int(self.data["resource_spec"]["slave"]["id"]) params["spec_config"] = self.data["resource_spec"]["slave"] params["meta_role"] = InstanceRole.REDIS_SLAVE.value + params["server_shards"] = twemproxy_server_shards[ip] + params["cache_backup_mode"] = get_cache_backup_mode(self.data["bk_biz_id"], 0) sub_builder = RedisBatchInstallAtomJob(self.root_id, self.data, act_kwargs, params) sub_pipelines.append(sub_builder) redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py index 20258045b4..a6ddb37486 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_data_copy.py @@ -81,7 +81,11 @@ from backend.flow.utils.redis.redis_act_playload import RedisActPayload from backend.flow.utils.redis.redis_context_dataclass import ActKwargs, RedisDtsContext, RedisDtsOnlineSwitchContext from backend.flow.utils.redis.redis_db_meta import RedisDBMeta -from backend.flow.utils.redis.redis_proxy_util import check_cluster_proxy_backends_consistent +from backend.flow.utils.redis.redis_proxy_util import ( + check_cluster_proxy_backends_consistent, + get_cache_backup_mode, + get_twemproxy_cluster_server_shards, +) from backend.utils.time import datetime2str logger = logging.getLogger("flow") @@ -899,11 +903,18 @@ def online_switch_flow(self): # 但是在确定flow 流程静态信息时,这些 master/slave 依然属于 dst_cluster # 所以这里获取的是 dst_cluster的 master/slave ip ports src_master_slave_ports = self.__get_cluster_master_slave_ports(int(job_row.app), job_row.dst_cluster_id) + src_twemproxy_server_shards = get_twemproxy_cluster_server_shards( + int(job_row.app), job_row.dst_cluster_id, {} + ) acts_list = [] for ip, ports in src_master_slave_ports["master_ports"].items(): act_kwargs.cluster["servers"][0]["server_ip"] = ip act_kwargs.cluster["servers"][0]["server_ports"] = ports act_kwargs.cluster["servers"][0]["meta_role"] = InstanceRole.REDIS_MASTER.value + act_kwargs.cluster["servers"][0]["server_shards"] = src_twemproxy_server_shards.get(ip, {}) + act_kwargs.cluster["servers"][0]["cache_backup_mode"] = get_cache_backup_mode( + int(job_row.app), job_row.dst_cluster_id + ) act_kwargs.exec_ip = ip act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ acts_list.append( @@ -917,6 +928,10 @@ def online_switch_flow(self): act_kwargs.cluster["servers"][0]["server_ip"] = ip act_kwargs.cluster["servers"][0]["server_ports"] = ports act_kwargs.cluster["servers"][0]["meta_role"] = InstanceRole.REDIS_SLAVE.value + act_kwargs.cluster["servers"][0]["server_shards"] = src_twemproxy_server_shards.get(ip, {}) + act_kwargs.cluster["servers"][0]["cache_backup_mode"] = get_cache_backup_mode( + int(job_row.app), job_row.dst_cluster_id + ) act_kwargs.exec_ip = ip act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ acts_list.append( @@ -945,11 +960,18 @@ def online_switch_flow(self): # 但是在确定flow 流程静态信息时,这些 master/slave 依然属于 src_cluster # 所以这里获取的是 src_cluster的 master/slave ip ports dst_master_slave_ports = self.__get_cluster_master_slave_ports(int(job_row.app), job_row.src_cluster_id) + dst_twemproxy_server_shards = get_twemproxy_cluster_server_shards( + int(job_row.app), job_row.src_cluster_id, {} + ) acts_list = [] for ip, ports in dst_master_slave_ports["master_ports"].items(): act_kwargs.cluster["servers"][0]["server_ip"] = ip act_kwargs.cluster["servers"][0]["server_ports"] = ports act_kwargs.cluster["servers"][0]["meta_role"] = InstanceRole.REDIS_MASTER.value + act_kwargs.cluster["servers"][0]["server_shards"] = dst_twemproxy_server_shards.get(ip, {}) + act_kwargs.cluster["servers"][0]["cache_backup_mode"] = get_cache_backup_mode( + int(job_row.app), job_row.src_cluster_id + ) act_kwargs.exec_ip = ip act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ acts_list.append( @@ -963,6 +985,10 @@ def online_switch_flow(self): act_kwargs.cluster["servers"][0]["server_ip"] = ip act_kwargs.cluster["servers"][0]["server_ports"] = ports act_kwargs.cluster["servers"][0]["meta_role"] = InstanceRole.REDIS_SLAVE.value + act_kwargs.cluster["servers"][0]["server_shards"] = dst_twemproxy_server_shards.get(ip, {}) + act_kwargs.cluster["servers"][0]["cache_backup_mode"] = get_cache_backup_mode( + int(job_row.app), job_row.src_cluster_id + ) act_kwargs.exec_ip = ip act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ acts_list.append( diff --git a/dbm-ui/backend/flow/plugins/components/collections/redis/redis_trans_files.py b/dbm-ui/backend/flow/plugins/components/collections/redis/redis_trans_files.py index 1e4db0d629..5a45203622 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/redis/redis_trans_files.py +++ b/dbm-ui/backend/flow/plugins/components/collections/redis/redis_trans_files.py @@ -51,7 +51,7 @@ def _execute(self, data, parent_data) -> bool: source_ip_list = {} file_list = [] for backup_inst in backup_infos: - file_list.extend(backup_inst["backup_files"]) + file_list.append(backup_inst["backup_file"]) if not source_ip_list.get(backup_inst["server_ip"]): source_ip_list[backup_inst["server_ip"]] = True self.log_info("get backup files {}:{}".format(source_ip_list.keys(), file_list)) diff --git a/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py b/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py index 96f48a09e6..dc0da319a0 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py +++ b/dbm-ui/backend/flow/utils/redis/redis_proxy_util.py @@ -10,13 +10,16 @@ """ import hashlib import logging.config +from collections import defaultdict from typing import Dict, List, Tuple from backend.components import DBConfigApi, DRSApi from backend.components.dbconfig.constants import FormatType, LevelName +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import ClusterType, InstanceRole from backend.db_meta.models import Cluster -from backend.db_services.redis.util import is_predixy_proxy_type, is_twemproxy_proxy_type -from backend.flow.consts import ConfigTypeEnum +from backend.db_services.redis.util import is_predixy_proxy_type, is_redis_instance_type, is_twemproxy_proxy_type +from backend.flow.consts import DEFAULT_DB_MODULE_ID, ConfigFileEnum, ConfigTypeEnum logger = logging.getLogger("flow") @@ -219,3 +222,88 @@ def check_cluster_proxy_backends_consistent(cluster_id: int, cluster_password: s raise Exception( "proxy[{}->{}] backends is not same".format(sorted_md5[0]["proxy_addr"], sorted_md5[-1]["proxy_addr"]) ) + + +def get_twemproxy_cluster_server_shards(bk_biz_id: int, cluster_id: int, other_to_master: dict) -> dict: + """ + 获取twemproxy集群的server_shards + :param bk_biz_id: 业务id + :param cluster_id: 集群id + :param other_to_master: other实例 到 master实例的映射关系,格式为{a.a.a.a:30000 : b.b.b.b:30000} + """ + cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) + if not is_twemproxy_proxy_type(cluster.cluster_type): + return {} + twemproxy_server_shards = defaultdict(dict) + ipport_to_segment = {} + for row in cluster.nosqlstoragesetdtl_set.all(): + ipport = row.instance.machine.ip + IP_PORT_DIVIDER + str(row.instance.port) + ipport_to_segment[ipport] = row.seg_range + + for master_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value): + if master_obj.as_ejector and master_obj.as_ejector.first(): + slave_obj = master_obj.as_ejector.get().receiver + master_ipport = master_obj.machine.ip + IP_PORT_DIVIDER + str(master_obj.port) + slave_ipport = slave_obj.machine.ip + IP_PORT_DIVIDER + str(slave_obj.port) + + twemproxy_server_shards[master_obj.machine.ip][master_ipport] = ipport_to_segment[master_ipport] + twemproxy_server_shards[slave_obj.machine.ip][slave_ipport] = ipport_to_segment[master_ipport] + + for other_ipport, master_ipport in other_to_master.items(): + if master_ipport not in ipport_to_segment: + raise Exception( + "master ipport {} not found seg_range, not belong cluster:{}??".format( + master_ipport, cluster.immute_domain + ) + ) + other_list = other_ipport.split(IP_PORT_DIVIDER) + other_ip = other_list[0] + twemproxy_server_shards[other_ip][other_ipport] = ipport_to_segment[master_ipport] + return twemproxy_server_shards + + +def get_cache_backup_mode(bk_biz_id: int, cluster_id: int) -> str: + """ + 获取集群的cache_backup_mode + :param bk_biz_id: 业务id + :param cluster_id: 集群id + """ + query_params = { + "bk_biz_id": str(bk_biz_id), + "level_name": LevelName.CLUSTER.value, + "level_value": "", + "level_info": {"module": str(DEFAULT_DB_MODULE_ID)}, + "conf_file": ConfigFileEnum.FullBackup.value, + "conf_type": ConfigTypeEnum.Config.value, + "namespace": ClusterType.TendisTwemproxyRedisInstance.value, + "format": FormatType.MAP.value, + } + if cluster_id == 0: + # 获取业务级别的配置 + query_params["level_name"] = LevelName.APP.value + query_params["level_value"] = str(bk_biz_id) + resp = DBConfigApi.query_conf_item(params=query_params) + if resp["content"]: + return resp["content"].get("cache_backup_mode", "") + cluster: Cluster = None + try: + cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) + except Cluster.DoesNotExist: + # 获取业务级别的配置 + query_params["level_name"] = LevelName.APP.value + query_params["level_value"] = str(bk_biz_id) + resp = DBConfigApi.query_conf_item(params=query_params) + if resp["content"]: + return resp["content"].get("cache_backup_mode", "") + if not is_redis_instance_type(cluster.cluster_type): + return "" + # 获取集群级别的配置 + query_params["level_name"] = LevelName.CLUSTER.value + query_params["level_value"] = cluster.immute_domain + query_params["namespace"] = cluster.cluster_type + try: + resp = DBConfigApi.query_conf_item(params=query_params) + if resp["content"]: + return resp["content"].get("cache_backup_mode", "") + except Exception: + return "aof"