From 7d2a97bcc1b57f52ea164f1e3346a57aa99897b7 Mon Sep 17 00:00:00 2001 From: lukemakeit <2302063437@qq.com> Date: Fri, 8 Mar 2024 16:20:06 +0800 Subject: [PATCH] =?UTF-8?q?fix(redis):=20redis=20dts=5Fserver=E5=9C=A8?= =?UTF-8?q?=E4=BF=AE=E5=A4=8Dredis-sync=E7=8A=B6=E6=80=81=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E5=90=8E,=E8=BF=9B=E7=A8=8B=E5=A7=8B=E7=BB=88?= =?UTF-8?q?=E5=AD=98=E5=9C=A8=E7=9A=84bug=20#3498?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/atomjobs/atomredis/redis_install.go | 2 +- .../redis/redis-dts/pkg/constvar/constvar.go | 30 +++++++-- .../pkg/dtsTask/rediscache/makeCacheSync.go | 13 +--- .../pkg/dtsTask/tendisplus/makeSync.go | 36 +++++++--- .../pkg/dtsTask/tendisssd/makeSync.go | 66 +++++++------------ 5 files changed, 79 insertions(+), 68 deletions(-) diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_install.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_install.go index 3a150bc09f..9d0a0ee547 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_install.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_install.go @@ -34,7 +34,7 @@ type RedisInstallParams struct { Databases int `json:"databases" validate:"required"` RedisConfConfigs map[string]string `json:"redis_conf_configs" validate:"required"` DbType string `json:"db_type" validate:"required"` - MaxMemory uint64 `json:"maxmemory" validate:"required"` + MaxMemory uint64 `json:"maxmemory"` } // RedisInstall redis install atomjob diff --git a/dbm-services/redis/redis-dts/pkg/constvar/constvar.go b/dbm-services/redis/redis-dts/pkg/constvar/constvar.go index ca8c57d574..03f1a34f98 100644 --- a/dbm-services/redis/redis-dts/pkg/constvar/constvar.go +++ b/dbm-services/redis/redis-dts/pkg/constvar/constvar.go @@ -99,7 +99,7 @@ const ( // tendisplus task type const ( TendisplusMakeSyncTaskType = "tendisplusMakeSync" - // 将存量数据同步 与 增量数据同步分开,原因是 存量数据同步讲占用较多内存,增量不占用内存 + // TendisplusSendBulkTaskType 将存量数据同步 与 增量数据同步分开,原因是 存量数据同步讲占用较多内存,增量不占用内存 TendisplusSendBulkTaskType = "tendisplusSendBulk" TendisplusSendIncrTaskType = "tendisplusSendIncr" ) @@ -146,31 +146,36 @@ var ListKeyFileReg = regexp.MustCompile(TredisdumpListRegMatch) // redis-sync 操作状态 const ( + // RedisSyncPauseTodo TODO // pause,'SYNCADMIN stop' RedisSyncPauseTodo = "SyncPauseTodo" RedisSyncPauseFail = "SyncPauseFail" RedisSyncPauseSucc = "SyncPauseSucc" + // RedisSyncResumeTodo TODO // resume,'SYNCADMIN start' RedisSyncResumeTodo = "SyncResumeTodo" RedisSyncResumeFail = "SyncResumeFail" RedisSyncResumeSucc = "SyncResumeSucc" + // RedisSyncUpgradeTodo TODO // upgrade,upgrade redis-sync binary RedisSyncUpgradeTodo = "SyncUpgradeTodo" RedisSyncUpgradeFail = "SyncUpgradeFail" RedisSyncUpgradeSucc = "SyncUpgradeSucc" + // RedisSyncStopTodo TODO // Stop,kill redis-sync proccess RedisSyncStopTodo = "SyncStopTodo" RedisSyncStopFail = "SyncStopFail" RedisSyncStopSucc = "SyncStopSucc" - // force kill migrate task + // RedisForceKillTaskTodo kill migrate task RedisForceKillTaskTodo = "ForceKillTaskTodo" RedisForceKillTaskFail = "ForceKillTaskFail" RedisForceKillTaskSuccess = "ForceKillTaskSucc" + // ReSyncFromSpecTimeTodo TODO // resyunc from specific time ReSyncFromSpecTimeTodo = "ReSyncFromSpecTimeTodo" ReSyncFromSpecTimeFail = "ReSyncFromSpecTimeFail" @@ -185,6 +190,7 @@ const ( // remote services' name const ( DtsRemoteTendisxk8s = "dtsRemoteTendisxk8s" + // K8sIsDtsSvrInBlacklistURL TODO // tendisk8s mico service K8sIsDtsSvrInBlacklistURL = "/tendisxk8s/cluster/tendis-dts/is-dts-server-in-blacklist" K8sDtsLockKeyURL = "/tendisxk8s/cluster/tendis-dts/dts-lock-key" @@ -235,10 +241,24 @@ func ZonenameTransform(zoneName string) string { // dts write_mode const ( - // 先删除同名redis key, 再执行写入(如:del $key + hset $key) + // WriteModeDeleteAndWriteToRedis 先删除同名redis key, 再执行写入(如:del $key + hset $key) WriteModeDeleteAndWriteToRedis = "delete_and_write_to_redis" - // 保留同名redis key,追加写入(如hset $key) + // WriteModeKeepAndAppendToRedis 保留同名redis key,追加写入(如hset $key) WriteModeKeepAndAppendToRedis = "keep_and_append_to_redis" - // 先清空目标集群所有数据,在写入(如flushall + hset $key) + // WriteModeFlushallAndWriteToRedis 先清空目标集群所有数据,在写入(如flushall + hset $key) WriteModeFlushallAndWriteToRedis = "flushall_and_write_to_redis" ) + +// dts copy type +const ( + // CopyTypeOneAppDiffCluster 同业务不同集群 + CopyTypeOneAppDiffCluster = "one_app_diff_cluster" + // CopyTypeDiffAppDiffCluster 不同业务不同集群 + CopyTypeDiffAppDiffCluster = "diff_app_diff_cluster" + // CopyTypeCopyToOtherSystem 业务内至第三方 + CopyTypeCopyToOtherSystem = "copy_to_other_system" + // CopyTypeUserBuiltToDBM_built_to_dbm 自建集群至业务内 + CopyTypeUserBuiltToDBM_built_to_dbm = "user_built_to_dbm" + // CopyTypeCopyFromRollbackInstance 构造实例至业务内 + CopyTypeCopyFromRollbackInstance = "copy_from_rollback_instance" +) diff --git a/dbm-services/redis/redis-dts/pkg/dtsTask/rediscache/makeCacheSync.go b/dbm-services/redis/redis-dts/pkg/dtsTask/rediscache/makeCacheSync.go index 303fe7156a..47bb08a41f 100644 --- a/dbm-services/redis/redis-dts/pkg/dtsTask/rediscache/makeCacheSync.go +++ b/dbm-services/redis/redis-dts/pkg/dtsTask/rediscache/makeCacheSync.go @@ -666,18 +666,6 @@ func (task *MakeCacheSyncTask) WatchShake() { task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate)) return } - // upgrade redis-shake - if task.RowData.SyncOperate == constvar.RedisSyncUpgradeTodo { - task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate)) - task.UpgradeShakeMedia() - if task.Err != nil { - return - } - task.SetSyncOperate(constvar.RedisSyncUpgradeSucc) - task.UpdateDbAndLogLocal(constvar.RedisSyncUpgradeSucc + "...") - task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate)) - continue - } metric := task.GetShakeMerics() if task.Err != nil { return @@ -778,6 +766,7 @@ func (task *MakeCacheSyncTask) GetShakeMerics() *RedisShakeMetric { break } if task.Err != nil { + task.Err = fmt.Errorf("redis-shake错误退出") return nil } shakeMeric := []RedisShakeMetric{} diff --git a/dbm-services/redis/redis-dts/pkg/dtsTask/tendisplus/makeSync.go b/dbm-services/redis/redis-dts/pkg/dtsTask/tendisplus/makeSync.go index dcdddaf042..85c65ed382 100644 --- a/dbm-services/redis/redis-dts/pkg/dtsTask/tendisplus/makeSync.go +++ b/dbm-services/redis/redis-dts/pkg/dtsTask/tendisplus/makeSync.go @@ -719,12 +719,25 @@ func (task *MakeSyncTask) WatchSync() { task.SetTaskType(constvar.TendisplusSendBulkTaskType) task.UpdateRow() + retryTimes := 0 for { + if retryTimes > 6 { + // 如果连续重试多次获取redis-sync状态都是异常的,则认为失败,kill redis-sync,退出 + task.RedisSyncStop() + task.Logger.Error(fmt.Sprintf("watch redis-sync fail,err:%v,retryTimes:%d", task.Err, retryTimes)) + return + } time.Sleep(10 * time.Second) row01, err := tendisdb.GetTaskByID(task.RowData.ID, task.Logger) if err != nil { + // 调用接口获取task row信息失败,不是redis-sync状态失败 + // 不用kill redis-sync,直接返回即可 task.Err = err - return + retryTimes++ + if retryTimes > 3 { + return + } + continue } task.RowData = row01 if task.RowData.KillSyncer == 1 || @@ -755,7 +768,8 @@ func (task *MakeSyncTask) WatchSync() { } syncInfoMap := task.RedisSyncInfo("") if task.Err != nil { - return + retryTimes++ + continue } redisIP := syncInfoMap["redis_ip"] redisPort := syncInfoMap["redis_port"] @@ -765,7 +779,8 @@ func (task *MakeSyncTask) WatchSync() { task.SetMessage(task.Err.Error()) task.SetStatus(-1) task.UpdateDbAndLogLocal(task.Err.Error()) - return + retryTimes++ + continue } syncState := syncInfoMap["sync_redis_state"] if syncState != constvar.SyncOnlineState { @@ -773,7 +788,7 @@ func (task *MakeSyncTask) WatchSync() { "127.0.0.1", task.RowData.SyncerPort, syncState, constvar.SyncOnlineState) task.SetStatus(-1) task.UpdateDbAndLogLocal(task.Err.Error()) - // return + retryTimes++ continue } infoRepl, err := tenSlaveCli.TendisplusInfoRepl() @@ -781,14 +796,16 @@ func (task *MakeSyncTask) WatchSync() { task.Err = err task.SetStatus(-1) task.UpdateDbAndLogLocal(task.Err.Error()) - return + retryTimes++ + continue } if len(infoRepl.RocksdbSlaveList) == 0 { task.Err = fmt.Errorf("tendisplus slave(%s) 'info replication' not found rocksdb slaves", task.GetSrcRedisAddr()) task.SetStatus(-1) task.UpdateDbAndLogLocal(task.Err.Error()) - return + retryTimes++ + continue } var myRockSlave *myredis.InfoReplRocksdbSlave = nil for _, slave01 := range infoRepl.RocksdbSlaveList { @@ -803,7 +820,8 @@ func (task *MakeSyncTask) WatchSync() { task.SetStatus(-1) task.UpdateDbAndLogLocal(task.Err.Error()) task.Logger.Info(infoRepl.String()) - return + retryTimes++ + continue } if myRockSlave.State != constvar.TendisplusReplSendbulk && myRockSlave.State != constvar.TendisplusReplOnline { @@ -812,7 +830,8 @@ func (task *MakeSyncTask) WatchSync() { constvar.TendisplusReplSendbulk, constvar.TendisplusReplOnline) task.SetStatus(-1) task.UpdateDbAndLogLocal(task.Err.Error()) - return + retryTimes++ + continue } task.SetStatus(1) if myRockSlave.State == constvar.TendisplusReplSendbulk { @@ -822,5 +841,6 @@ func (task *MakeSyncTask) WatchSync() { task.SetTaskType(constvar.TendisplusSendIncrTaskType) task.UpdateDbAndLogLocal("增量同步中,binlog_pos:%d,lag:%d", myRockSlave.BinlogPos, myRockSlave.Lag) } + retryTimes = 0 } } diff --git a/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/makeSync.go b/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/makeSync.go index bd164d3a60..85758d159c 100644 --- a/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/makeSync.go +++ b/dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/makeSync.go @@ -364,12 +364,25 @@ func (task *MakeSyncTask) WatchSync() { } lastSeqAndTime := dtsTask.SyncSeqItem{} + retryTimes := 0 for { + if retryTimes > 6 { + // 如果连续重试多次获取redis-sync状态都是异常的,则认为失败,kill redis-sync,退出 + task.RedisSyncStop() + task.Logger.Error(fmt.Sprintf("watch redis-sync fail,err:%v,retryTimes:%d", task.Err, retryTimes)) + return + } time.Sleep(10 * time.Second) task.RefreshRowData() if task.Err != nil { - return + // 调用接口获取task row信息失败,不是redis-sync状态失败 + // 不用kill redis-sync,直接返回即可 + retryTimes++ + if retryTimes > 3 { + return + } + continue } if task.RowData.KillSyncer == 1 || task.RowData.SyncOperate == constvar.RedisSyncStopTodo || @@ -396,51 +409,17 @@ func (task *MakeSyncTask) WatchSync() { task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate)) return } - // pause and resume redis-sync - if task.RowData.SyncOperate == constvar.RedisSyncPauseTodo { - task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate)) - task.PauseAndResumeSync() - task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate)) - if task.Err != nil { - return - } - continue - } - // upgrade redis-sync - if task.RowData.SyncOperate == constvar.RedisSyncUpgradeTodo { - task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate)) - task.UpgradeSyncMedia() - if task.Err != nil { - return - } - task.SetSyncOperate(constvar.RedisSyncUpgradeSucc) - task.UpdateDbAndLogLocal(constvar.RedisSyncUpgradeSucc + "...") - - task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate)) - continue - } - // resync from specific time - if task.RowData.SyncOperate == constvar.ReSyncFromSpecTimeTodo { - task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate)) - task.ReSyncFromSpecTime(task.RowData.ResyncFromTime.Time) - if task.Err != nil { - return - } - task.SetSyncOperate(constvar.ReSyncFromSpecTimeSucc) - task.UpdateDbAndLogLocal(constvar.ReSyncFromSpecTimeSucc + "...") - - task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate)) - continue - } syncInfoMap := task.RedisSyncInfo("tendis-ssd") if task.Err != nil { - return + retryTimes++ + continue } binlogLag, _ := strconv.ParseInt(syncInfoMap["tendis_binlog_lag"], 10, 64) if binlogLag < 0 { // 说明 redis-sync没有正常运行 task.SetTendisBinlogLag(binlogLag) task.SetStatus(-1) task.UpdateDbAndLogLocal("redis-sync 同步异常,binlog延迟:%d", binlogLag) + retryTimes++ continue } tendisIP := syncInfoMap["tendis-ssd_ip"] @@ -452,13 +431,15 @@ func (task *MakeSyncTask) WatchSync() { task.RowData.SrcIP, task.RowData.SrcPort) task.SetStatus(-1) task.UpdateDbAndLogLocal(task.Err.Error()) - return + retryTimes++ + continue } if binlogLag < 600 && slaveLogCountDecr == false { // 如果redis-sync同步延迟在600s以内,则修改srcSlave slave-log-keep-count=1200w,避免告警 task.ChangeSrcSSDKeepCount(1200 * 10000) if task.Err != nil { - return + retryTimes++ + continue } slaveLogCountDecr = true } @@ -479,12 +460,12 @@ func (task *MakeSyncTask) WatchSync() { task.SetMessage(task.Err.Error()) task.SetStatus(-1) task.UpdateRow() - // not return + // 只是保存binlog位置失败了,继续 } // 如果redis-sync seq 60分钟没有任何变化,则代表同步hang住了 // 例外情况: // 1. 回档临时环境,不会有心跳写入,tendis_last_seq不会变; - if nowSeq.Seq == lastSeqAndTime.Seq && jobRows[0].SrcClusterType != constvar.UserTwemproxyType { + if nowSeq.Seq == lastSeqAndTime.Seq && jobRows[0].DtsCopyType != constvar.CopyTypeCopyFromRollbackInstance { if time.Now().Local().Sub(lastSeqAndTime.Time.Time).Minutes() > 60 { task.Err = fmt.Errorf("binlog seq 已经60分钟未更新,redis-sync是否hang住?") task.SetStatus(-1) @@ -494,6 +475,7 @@ func (task *MakeSyncTask) WatchSync() { } else { lastSeqAndTime = nowSeq } + retryTimes = 0 } }