Skip to content

Commit

Permalink
fix(redis): redis dts_server在修复redis-sync状态异常后,进程始终存在的bug #3498
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemakeit authored and zhangzhw8 committed Mar 27, 2024
1 parent e4f9fcc commit 7d2a97b
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type RedisInstallParams struct {
Databases int `json:"databases" validate:"required"`
RedisConfConfigs map[string]string `json:"redis_conf_configs" validate:"required"`
DbType string `json:"db_type" validate:"required"`
MaxMemory uint64 `json:"maxmemory" validate:"required"`
MaxMemory uint64 `json:"maxmemory"`
}

// RedisInstall redis install atomjob
Expand Down
30 changes: 25 additions & 5 deletions dbm-services/redis/redis-dts/pkg/constvar/constvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const (
// tendisplus task type
const (
TendisplusMakeSyncTaskType = "tendisplusMakeSync"
// 将存量数据同步 与 增量数据同步分开,原因是 存量数据同步讲占用较多内存,增量不占用内存
// TendisplusSendBulkTaskType 将存量数据同步 与 增量数据同步分开,原因是 存量数据同步讲占用较多内存,增量不占用内存
TendisplusSendBulkTaskType = "tendisplusSendBulk"
TendisplusSendIncrTaskType = "tendisplusSendIncr"
)
Expand Down Expand Up @@ -146,31 +146,36 @@ var ListKeyFileReg = regexp.MustCompile(TredisdumpListRegMatch)

// redis-sync 操作状态
const (
// RedisSyncPauseTodo TODO
// pause,'SYNCADMIN stop'
RedisSyncPauseTodo = "SyncPauseTodo"
RedisSyncPauseFail = "SyncPauseFail"
RedisSyncPauseSucc = "SyncPauseSucc"

// RedisSyncResumeTodo TODO
// resume,'SYNCADMIN start'
RedisSyncResumeTodo = "SyncResumeTodo"
RedisSyncResumeFail = "SyncResumeFail"
RedisSyncResumeSucc = "SyncResumeSucc"

// RedisSyncUpgradeTodo TODO
// upgrade,upgrade redis-sync binary
RedisSyncUpgradeTodo = "SyncUpgradeTodo"
RedisSyncUpgradeFail = "SyncUpgradeFail"
RedisSyncUpgradeSucc = "SyncUpgradeSucc"

// RedisSyncStopTodo TODO
// Stop,kill redis-sync proccess
RedisSyncStopTodo = "SyncStopTodo"
RedisSyncStopFail = "SyncStopFail"
RedisSyncStopSucc = "SyncStopSucc"

// force kill migrate task
// RedisForceKillTaskTodo kill migrate task
RedisForceKillTaskTodo = "ForceKillTaskTodo"
RedisForceKillTaskFail = "ForceKillTaskFail"
RedisForceKillTaskSuccess = "ForceKillTaskSucc"

// ReSyncFromSpecTimeTodo TODO
// resyunc from specific time
ReSyncFromSpecTimeTodo = "ReSyncFromSpecTimeTodo"
ReSyncFromSpecTimeFail = "ReSyncFromSpecTimeFail"
Expand All @@ -185,6 +190,7 @@ const (
// remote services' name
const (
DtsRemoteTendisxk8s = "dtsRemoteTendisxk8s"
// K8sIsDtsSvrInBlacklistURL TODO
// tendisk8s mico service
K8sIsDtsSvrInBlacklistURL = "/tendisxk8s/cluster/tendis-dts/is-dts-server-in-blacklist"
K8sDtsLockKeyURL = "/tendisxk8s/cluster/tendis-dts/dts-lock-key"
Expand Down Expand Up @@ -235,10 +241,24 @@ func ZonenameTransform(zoneName string) string {

// dts write_mode
const (
// 先删除同名redis key, 再执行写入(如:del $key + hset $key)
// WriteModeDeleteAndWriteToRedis 先删除同名redis key, 再执行写入(如:del $key + hset $key)
WriteModeDeleteAndWriteToRedis = "delete_and_write_to_redis"
// 保留同名redis key,追加写入(如hset $key)
// WriteModeKeepAndAppendToRedis 保留同名redis key,追加写入(如hset $key)
WriteModeKeepAndAppendToRedis = "keep_and_append_to_redis"
// 先清空目标集群所有数据,在写入(如flushall + hset $key)
// WriteModeFlushallAndWriteToRedis 先清空目标集群所有数据,在写入(如flushall + hset $key)
WriteModeFlushallAndWriteToRedis = "flushall_and_write_to_redis"
)

// dts copy type
const (
// CopyTypeOneAppDiffCluster 同业务不同集群
CopyTypeOneAppDiffCluster = "one_app_diff_cluster"
// CopyTypeDiffAppDiffCluster 不同业务不同集群
CopyTypeDiffAppDiffCluster = "diff_app_diff_cluster"
// CopyTypeCopyToOtherSystem 业务内至第三方
CopyTypeCopyToOtherSystem = "copy_to_other_system"
// CopyTypeUserBuiltToDBM_built_to_dbm 自建集群至业务内
CopyTypeUserBuiltToDBM_built_to_dbm = "user_built_to_dbm"
// CopyTypeCopyFromRollbackInstance 构造实例至业务内
CopyTypeCopyFromRollbackInstance = "copy_from_rollback_instance"
)
Original file line number Diff line number Diff line change
Expand Up @@ -666,18 +666,6 @@ func (task *MakeCacheSyncTask) WatchShake() {
task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate))
return
}
// upgrade redis-shake
if task.RowData.SyncOperate == constvar.RedisSyncUpgradeTodo {
task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate))
task.UpgradeShakeMedia()
if task.Err != nil {
return
}
task.SetSyncOperate(constvar.RedisSyncUpgradeSucc)
task.UpdateDbAndLogLocal(constvar.RedisSyncUpgradeSucc + "...")
task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate))
continue
}
metric := task.GetShakeMerics()
if task.Err != nil {
return
Expand Down Expand Up @@ -778,6 +766,7 @@ func (task *MakeCacheSyncTask) GetShakeMerics() *RedisShakeMetric {
break
}
if task.Err != nil {
task.Err = fmt.Errorf("redis-shake错误退出")
return nil
}
shakeMeric := []RedisShakeMetric{}
Expand Down
36 changes: 28 additions & 8 deletions dbm-services/redis/redis-dts/pkg/dtsTask/tendisplus/makeSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,12 +719,25 @@ func (task *MakeSyncTask) WatchSync() {
task.SetTaskType(constvar.TendisplusSendBulkTaskType)
task.UpdateRow()

retryTimes := 0
for {
if retryTimes > 6 {
// 如果连续重试多次获取redis-sync状态都是异常的,则认为失败,kill redis-sync,退出
task.RedisSyncStop()
task.Logger.Error(fmt.Sprintf("watch redis-sync fail,err:%v,retryTimes:%d", task.Err, retryTimes))
return
}
time.Sleep(10 * time.Second)
row01, err := tendisdb.GetTaskByID(task.RowData.ID, task.Logger)
if err != nil {
// 调用接口获取task row信息失败,不是redis-sync状态失败
// 不用kill redis-sync,直接返回即可
task.Err = err
return
retryTimes++
if retryTimes > 3 {
return
}
continue
}
task.RowData = row01
if task.RowData.KillSyncer == 1 ||
Expand Down Expand Up @@ -755,7 +768,8 @@ func (task *MakeSyncTask) WatchSync() {
}
syncInfoMap := task.RedisSyncInfo("")
if task.Err != nil {
return
retryTimes++
continue
}
redisIP := syncInfoMap["redis_ip"]
redisPort := syncInfoMap["redis_port"]
Expand All @@ -765,30 +779,33 @@ func (task *MakeSyncTask) WatchSync() {
task.SetMessage(task.Err.Error())
task.SetStatus(-1)
task.UpdateDbAndLogLocal(task.Err.Error())
return
retryTimes++
continue
}
syncState := syncInfoMap["sync_redis_state"]
if syncState != constvar.SyncOnlineState {
task.Err = fmt.Errorf("redis-sync(%s:%d) sync-redis-state:%s != %s",
"127.0.0.1", task.RowData.SyncerPort, syncState, constvar.SyncOnlineState)
task.SetStatus(-1)
task.UpdateDbAndLogLocal(task.Err.Error())
// return
retryTimes++
continue
}
infoRepl, err := tenSlaveCli.TendisplusInfoRepl()
if err != nil {
task.Err = err
task.SetStatus(-1)
task.UpdateDbAndLogLocal(task.Err.Error())
return
retryTimes++
continue
}
if len(infoRepl.RocksdbSlaveList) == 0 {
task.Err = fmt.Errorf("tendisplus slave(%s) 'info replication' not found rocksdb slaves",
task.GetSrcRedisAddr())
task.SetStatus(-1)
task.UpdateDbAndLogLocal(task.Err.Error())
return
retryTimes++
continue
}
var myRockSlave *myredis.InfoReplRocksdbSlave = nil
for _, slave01 := range infoRepl.RocksdbSlaveList {
Expand All @@ -803,7 +820,8 @@ func (task *MakeSyncTask) WatchSync() {
task.SetStatus(-1)
task.UpdateDbAndLogLocal(task.Err.Error())
task.Logger.Info(infoRepl.String())
return
retryTimes++
continue
}
if myRockSlave.State != constvar.TendisplusReplSendbulk &&
myRockSlave.State != constvar.TendisplusReplOnline {
Expand All @@ -812,7 +830,8 @@ func (task *MakeSyncTask) WatchSync() {
constvar.TendisplusReplSendbulk, constvar.TendisplusReplOnline)
task.SetStatus(-1)
task.UpdateDbAndLogLocal(task.Err.Error())
return
retryTimes++
continue
}
task.SetStatus(1)
if myRockSlave.State == constvar.TendisplusReplSendbulk {
Expand All @@ -822,5 +841,6 @@ func (task *MakeSyncTask) WatchSync() {
task.SetTaskType(constvar.TendisplusSendIncrTaskType)
task.UpdateDbAndLogLocal("增量同步中,binlog_pos:%d,lag:%d", myRockSlave.BinlogPos, myRockSlave.Lag)
}
retryTimes = 0
}
}
66 changes: 24 additions & 42 deletions dbm-services/redis/redis-dts/pkg/dtsTask/tendisssd/makeSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,25 @@ func (task *MakeSyncTask) WatchSync() {
}

lastSeqAndTime := dtsTask.SyncSeqItem{}
retryTimes := 0
for {
if retryTimes > 6 {
// 如果连续重试多次获取redis-sync状态都是异常的,则认为失败,kill redis-sync,退出
task.RedisSyncStop()
task.Logger.Error(fmt.Sprintf("watch redis-sync fail,err:%v,retryTimes:%d", task.Err, retryTimes))
return
}
time.Sleep(10 * time.Second)

task.RefreshRowData()
if task.Err != nil {
return
// 调用接口获取task row信息失败,不是redis-sync状态失败
// 不用kill redis-sync,直接返回即可
retryTimes++
if retryTimes > 3 {
return
}
continue
}
if task.RowData.KillSyncer == 1 ||
task.RowData.SyncOperate == constvar.RedisSyncStopTodo ||
Expand All @@ -396,51 +409,17 @@ func (task *MakeSyncTask) WatchSync() {
task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate))
return
}
// pause and resume redis-sync
if task.RowData.SyncOperate == constvar.RedisSyncPauseTodo {
task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate))
task.PauseAndResumeSync()
task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate))
if task.Err != nil {
return
}
continue
}
// upgrade redis-sync
if task.RowData.SyncOperate == constvar.RedisSyncUpgradeTodo {
task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate))
task.UpgradeSyncMedia()
if task.Err != nil {
return
}
task.SetSyncOperate(constvar.RedisSyncUpgradeSucc)
task.UpdateDbAndLogLocal(constvar.RedisSyncUpgradeSucc + "...")

task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate))
continue
}
// resync from specific time
if task.RowData.SyncOperate == constvar.ReSyncFromSpecTimeTodo {
task.Logger.Info(fmt.Sprintf("start execute %q ...", task.RowData.SyncOperate))
task.ReSyncFromSpecTime(task.RowData.ResyncFromTime.Time)
if task.Err != nil {
return
}
task.SetSyncOperate(constvar.ReSyncFromSpecTimeSucc)
task.UpdateDbAndLogLocal(constvar.ReSyncFromSpecTimeSucc + "...")

task.Logger.Info(fmt.Sprintf("end %q ...", task.RowData.SyncOperate))
continue
}
syncInfoMap := task.RedisSyncInfo("tendis-ssd")
if task.Err != nil {
return
retryTimes++
continue
}
binlogLag, _ := strconv.ParseInt(syncInfoMap["tendis_binlog_lag"], 10, 64)
if binlogLag < 0 { // 说明 redis-sync没有正常运行
task.SetTendisBinlogLag(binlogLag)
task.SetStatus(-1)
task.UpdateDbAndLogLocal("redis-sync 同步异常,binlog延迟:%d", binlogLag)
retryTimes++
continue
}
tendisIP := syncInfoMap["tendis-ssd_ip"]
Expand All @@ -452,13 +431,15 @@ func (task *MakeSyncTask) WatchSync() {
task.RowData.SrcIP, task.RowData.SrcPort)
task.SetStatus(-1)
task.UpdateDbAndLogLocal(task.Err.Error())
return
retryTimes++
continue
}
if binlogLag < 600 && slaveLogCountDecr == false {
// 如果redis-sync同步延迟在600s以内,则修改srcSlave slave-log-keep-count=1200w,避免告警
task.ChangeSrcSSDKeepCount(1200 * 10000)
if task.Err != nil {
return
retryTimes++
continue
}
slaveLogCountDecr = true
}
Expand All @@ -479,12 +460,12 @@ func (task *MakeSyncTask) WatchSync() {
task.SetMessage(task.Err.Error())
task.SetStatus(-1)
task.UpdateRow()
// not return
// 只是保存binlog位置失败了,继续
}
// 如果redis-sync seq 60分钟没有任何变化,则代表同步hang住了
// 例外情况:
// 1. 回档临时环境,不会有心跳写入,tendis_last_seq不会变;
if nowSeq.Seq == lastSeqAndTime.Seq && jobRows[0].SrcClusterType != constvar.UserTwemproxyType {
if nowSeq.Seq == lastSeqAndTime.Seq && jobRows[0].DtsCopyType != constvar.CopyTypeCopyFromRollbackInstance {
if time.Now().Local().Sub(lastSeqAndTime.Time.Time).Minutes() > 60 {
task.Err = fmt.Errorf("binlog seq 已经60分钟未更新,redis-sync是否hang住?")
task.SetStatus(-1)
Expand All @@ -494,6 +475,7 @@ func (task *MakeSyncTask) WatchSync() {
} else {
lastSeqAndTime = nowSeq
}
retryTimes = 0
}
}

Expand Down

0 comments on commit 7d2a97b

Please sign in to comment.