Skip to content

Commit

Permalink
feat(redis): redis 构造时区兼容 close #2701
Browse files Browse the repository at this point in the history
  • Loading branch information
mikluo authored and zhangzhw8 committed Dec 20, 2023
1 parent b347241 commit d32dd02
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 118 deletions.
74 changes: 43 additions & 31 deletions dbm-services/redis/db-tools/dbactuator/models/myredis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1608,62 +1608,74 @@ func (db *RedisClient) AdminSet(key, val string) (ret string, err error) {
}

// GetTendisplusHeartbeat 获取tendisplus 心跳数据
/* for example:
> adminget 1.1.1.1:heartbeat
1) 1) "0"
2) "2021-06-01 16:47:00"
2) 1) "1"
2) "2021-06-01 16:47:00"
3) 1) "2"
2) "2021-06-01 16:47:00"
4) 1) "3"
2) "2021-06-01 16:47:00"
5) 1) "4"
2) "2021-06-01 16:47:00"
6) 1) "5"
2) "2021-06-01 16:47:00"
7) 1) "6"
2) "2021-06-01 16:47:00"
8) 1) "7"
2) "2021-06-01 16:47:00"
9) 1) "8"
2) "2021-06-01 16:47:00"
/*
adminget xxx_30002:heartbeat
1) 1) "0"
2) "1702415993"
2) 1) "1"
2) "1702415993"
3) 1) "2"
2) "1702415993"
4) 1) "3"
2) "1702415993"
5) 1) "4"
2) "1702415993"
6) 1) "5"
2) "1702415993"
7) 1) "6"
2) "1702415993"
8) 1) "7"
2) "1702415993"
9) 1) "8"
2) "1702415993"
10) 1) "9"
2) "2021-06-01 16:47:00"
2) "1702415993"
*/
func (db *RedisClient) GetTendisplusHeartbeat(key string) (heartbeat map[int]time.Time, err error) {
// 命令'adminget ',只能用 普通redis client
if db.InstanceClient == nil {
err = fmt.Errorf("'adminget' redis:%s must create a standalone client", db.Addr)
mylog.Logger.Error(err.Error())
return
return nil, err
}

heartbeat = make(map[int]time.Time)
cmd := []interface{}{"adminget", key}
adminGetRet, err := db.InstanceClient.Do(context.TODO(), cmd...).Result()
if err != nil {
err = fmt.Errorf("redis:%s 'adminget %s' fail,err:%v\n", db.Addr, key, err)
err = fmt.Errorf("redis:%s 'adminget %s' fail, err: %v", db.Addr, key, err)
mylog.Logger.Error(err.Error())
return heartbeat, err
return nil, err
}

adminGetRets, ok := adminGetRet.([]interface{})
if ok == false {
err = fmt.Errorf("GetTendisplusHeartbeat 'adminget %s' result not []interface{},nodeAddr:%s", key, db.Addr)
if !ok {
err = fmt.Errorf("GetTendisplusHeartbeat 'adminget %s' result not []interface{}, nodeAddr: %s", key, db.Addr)
mylog.Logger.Error(err.Error())
return heartbeat, err
return nil, err
}

var storeID int
var value, storeIDStr string
var storeIDStr string
msg := fmt.Sprintf("检查目的集群是否有源集群的心跳数据 adminGetRets: %v", adminGetRets)
mylog.Logger.Info(msg)

for _, confItem := range adminGetRets {
conf01 := confItem.([]interface{})
if conf01[1] == nil {
if len(conf01) != 2 {
continue
}
storeIDStr = conf01[0].(string)
value = conf01[1].(string)
valueInt, err := strconv.ParseInt(conf01[1].(string), 10, 64)
if err != nil {
mylog.Logger.Error("时间解析失败:%v", err)
continue
}
storeID, _ = strconv.Atoi(storeIDStr)
heartbeat[storeID], _ = time.ParseInLocation(consts.UnixtimeLayout, value, time.Local)
t := time.Unix(valueInt, 0)
heartbeat[storeID] = t
}

return heartbeat, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
UnixtimeLayout = "2006-01-02 15:04:05"
FilenameTimeLayout = "20060102-150405"
FilenameDayLayout = "20060102"
UnixtimeLayoutZone = "2006-01-02T15:04:05-07:00"
)

// account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func NewFullbackPull(sourceIP, filehead, rollbackTime,
KvstoreNums: kvstoreNums,
TendisType: tendisType,
}
layout := "2006-01-02 15:04:05"
ret.RollbackDstTime, ret.Err = time.ParseInLocation(layout, rollbackTime, time.Local)
ret.RollbackDstTime, ret.Err = time.ParseInLocation(consts.UnixtimeLayoutZone, rollbackTime, time.Local)
if ret.Err != nil {
ret.Err = fmt.Errorf("rollbackTime:%s time.parese fail,err:%s,layout:%s", rollbackTime, ret.Err, layout)
ret.Err = fmt.Errorf("rollbackTime:%s time.parese fail,err:%s,consts.UnixtimeLayoutZone:%s",
rollbackTime, ret.Err, consts.UnixtimeLayoutZone)
mylog.Logger.Error(ret.Err.Error())
return ret
}
Expand Down Expand Up @@ -136,7 +136,6 @@ func (full *TendisFullBackPull) GetFullFilesSpecTimeRange(fullFileList []FileDet
lastDateReg1 = regexp.MustCompile(`^.*?(\d+-\d+).rdb`)
}

layout1 := "20060102-150405"
for _, str01 := range fullFileList {
back01 := &TendisFullBackItem{}
taskID, _ := strconv.Atoi(str01.TaskID)
Expand Down Expand Up @@ -173,7 +172,7 @@ func (full *TendisFullBackPull) GetFullFilesSpecTimeRange(fullFileList []FileDet
full.Err = err
return
}
bkCreateTime, err01 := time.ParseInLocation(layout1, match01[1], time.Local)
bkCreateTime, err01 := time.ParseInLocation(consts.FilenameTimeLayout, match01[1], time.Local)

// 3-TENDISPLUS-FULL-slave-127.0.0.x-30002-20230810-050140.tar
// 3-TENDISSSD-FULL-slave-127.0.0.x-30000-20230809-105510-52447.tar
Expand All @@ -183,7 +182,8 @@ func (full *TendisFullBackPull) GetFullFilesSpecTimeRange(fullFileList []FileDet
}

if err01 != nil {
full.Err = fmt.Errorf("backup file createTime:%s time.parese fail,err:%s,layout1:%s", match01[1], err01, layout1)
full.Err = fmt.Errorf("backup file createTime:%s time.parese fail,err:%s,consts.FilenameTimeLayout:%s",
match01[1], err01, consts.FilenameTimeLayout)
mylog.Logger.Error(full.Err.Error())
return
}
Expand Down Expand Up @@ -248,10 +248,9 @@ func (full *TendisFullBackPull) GetTendisFullbackNearestRkTime(fullFileList []Fi

}
}
layout := "2006-01-02 15:04:05"
if nearestFullbk == nil {
full.Err = fmt.Errorf("filename 正则:%s 最近%d天内没找到小于回档目标时间[%s]的全备",
full.FileHead, LastNDaysFullBack(), full.RollbackDstTime.Local().Format(layout))
full.FileHead, LastNDaysFullBack(), full.RollbackDstTime.Local().Format(consts.UnixtimeLayoutZone))
mylog.Logger.Error(full.Err.Error())
return
}
Expand All @@ -271,9 +270,8 @@ func (full *TendisFullBackPull) GetTendisFullbackNearestRkTime(fullFileList []Fi
}
return
}
layout := "2006-01-02 15:04:05"
full.Err = fmt.Errorf("GetTendisFullbackNearestRkTime: filename 正则:%s 最近%d天内没找到小于回档目标时间[%s]的全备",
full.FileHead, LastNDaysFullBack(), full.RollbackDstTime.Local().Format(layout))
full.FileHead, LastNDaysFullBack(), full.RollbackDstTime.Local().Format(consts.UnixtimeLayoutZone))
mylog.Logger.Error(full.Err.Error())

return
Expand Down Expand Up @@ -912,6 +910,11 @@ func (full *TendisFullBackPull) PullFullbackDecompressed() {
return
}
mylog.Logger.Info("localBkIsExists:%v,localBkIsCompelete:%v ", localBkIsExists, localBkIsCompelete)
// 这里解压 ,不然会走到 else if localBkIsExists == true && decpDirIsExists == true 那里才是第一次解压
full.Decompressed()
if full.Err != nil {
return
}
// 已解压的备份文件
decpDirIsExists, decpIsCompelete, _ := full.CheckDecompressedDirIsOK()
if full.Err != nil {
Expand All @@ -921,6 +924,9 @@ func (full *TendisFullBackPull) PullFullbackDecompressed() {

//1: 本地全备.tar, 全备(已解压)文件夹 均不存在 =>报错,需要flow重新下载;
if localBkIsExists == false && decpDirIsExists == false {
mylog.Logger.Info("localBkIsExists:%v,localBkIsCompelete:%v,decpDirIsExists:%v,decpIsCompelete:%v ",
localBkIsExists, localBkIsCompelete, decpDirIsExists, decpIsCompelete)
mylog.Logger.Info("1: 本地全备.tar, 全备(已解压)文件夹 均不存在 =>报错,需要flow重新下载")
full.RetryDownloadFiles()
if full.Err != nil {
return
Expand All @@ -930,9 +936,14 @@ func (full *TendisFullBackPull) PullFullbackDecompressed() {
//2. 本地全备.tar不存在、全备(已解压)文件夹 存在:
//- 全备(已解压)文件夹 完整 => 直接返回;
//- 全备(已解压)文件夹 不完整 => 删除全备(已解压)文件夹,报错,需要flow重新下载;
mylog.Logger.Info("localBkIsExists:%v,localBkIsCompelete:%v,decpDirIsExists:%v,decpIsCompelete:%v ",
localBkIsExists, localBkIsCompelete, decpDirIsExists, decpIsCompelete)
mylog.Logger.Info("2. 本地全备.tar不存在、全备(已解压)文件夹 存在")
if decpIsCompelete == true {
mylog.Logger.Info("2. 全备(已解压)文件夹 完整 => 直接返回;")
return
}
mylog.Logger.Info("2.全备(已解压)文件夹 不完整 => 删除全备(已解压)文件夹,报错,需要flow重新下载")
full.RmDecompressedDir()
if full.Err != nil {
return
Expand All @@ -946,10 +957,16 @@ func (full *TendisFullBackPull) PullFullbackDecompressed() {
//3. 本地全备.tar存在、全备(已解压)文件夹 不存在:
//- 本地全备.tar 完整, => 解压;
//- 本地全备.tar 不完整 => 删除 本地全备.tar,报错,需要flow重新下载;;
mylog.Logger.Info("localBkIsExists:%v,localBkIsCompelete:%v,decpDirIsExists:%v,decpIsCompelete:%v ",
localBkIsExists, localBkIsCompelete, decpDirIsExists, decpIsCompelete)

if localBkIsCompelete == true {
mylog.Logger.Info("3. 本地全备.tar 完整, => 解压")
full.Decompressed()
return
}
mylog.Logger.Info("3.本地全备.tar 不完整 => 删除 本地全备.tar,报错,需要flow重新下载")

full.RmLocalBakcupFile()
if full.Err != nil {
return
Expand All @@ -961,16 +978,27 @@ func (full *TendisFullBackPull) PullFullbackDecompressed() {

} else if localBkIsExists == true && decpDirIsExists == true {
//4. 本地全备.tar存在、全备(已解压)文件夹 存在
// 本地解压目录完整则返回
//- 本地全备.tar 完整 => 删除 全备(已解压)文件夹,重新解压(有理由怀疑上一次解压失败了,本地全备.tar 来不及删除)
//- 本地全备.tar 不完整 => 删除本地全备.tar + 删除 全备(已解压)文件夹, 报错,需要flow重新下载;;
//- 本地全备.tar 不完整 => 删除本地全备.tar + 删除 全备(已解压)文件夹, 报错,需要flow重新下载;
mylog.Logger.Info("localBkIsExists:%v,localBkIsCompelete:%v,decpDirIsExists:%v,decpIsCompelete:%v ",
localBkIsExists, localBkIsCompelete, decpDirIsExists, decpIsCompelete)

if decpIsCompelete == true {
mylog.Logger.Info("4. 本地解压目录完整则返回")
return

}
if localBkIsCompelete == true {
mylog.Logger.Info("4. 本地全备.tar 完整 => 删除 全备(已解压)文件夹,重新解压(有理由怀疑上一次解压失败了,本地全备.tar 来不及删除)")
full.RmDecompressedDir()
if full.Err != nil {
return
}
full.Decompressed()
return
}
mylog.Logger.Info("4. 本地全备.tar 不完整 => 删除本地全备.tar + 删除 全备(已解压)文件夹, 报错,需要flow重新下载")
full.RmDecompressedDir()
if full.Err != nil {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,8 +800,7 @@ func (task *TendisInsRecoverTask) PullIncrbackup() {
task.Err = incrBack.Err
return
}
layout := "2006-01-02 15:04:05"
rbDstTime, _ := time.ParseInLocation(layout, task.RecoveryTimePoint, time.Local)
rbDstTime, _ := time.ParseInLocation(consts.UnixtimeLayoutZone, task.RecoveryTimePoint, time.Local)
// 回档目标时间 比 用户填写的时间多1秒
// (因为binlog_tool的--end-datetime参数,--end-datetime这个时间点的binlog是不会被应用的)
rbDstTime = rbDstTime.Add(1 * time.Second)
Expand Down Expand Up @@ -831,8 +830,8 @@ func (task *TendisInsRecoverTask) PullIncrbackup() {
}
// task.runtime.Logger.Info("kvstore:%d ,backupMeta:%v", i, backupMeta)
// kvstore 维度的 的拉取备份文件任务,每个kvstore都是一个任务,因为kvstore的开始时间不一样
incrBack.NewRocksDBIncrBack(i, backupMeta.BinlogPos+1, backupMeta.StartTime.Local().Format(layout),
rbDstTime.Local().Format(layout), task.NeWTempIP, task.RecoverDir, task.RecoverDir)
incrBack.NewRocksDBIncrBack(i, backupMeta.BinlogPos+1, backupMeta.StartTime.Local().Format(consts.UnixtimeLayoutZone),
rbDstTime.Local().Format(consts.UnixtimeLayoutZone), task.NeWTempIP, task.RecoverDir, task.RecoverDir)
if incrBack.Err != nil {
task.Err = incrBack.Err
return
Expand Down Expand Up @@ -906,7 +905,10 @@ func (task *TendisInsRecoverTask) CheckRollbackResult() error {
defer redisCli.Close()
// 检查目的集群是否有源集群的心跳数据
srcHearbeatKey := task.GetTendisplusHearbeatKey(task.SourceIP, task.SourcePort)
mylog.Logger.Info("srcHearbeatKey:%s", srcHearbeatKey)
srcNodeHearbeat, err := redisCli.GetTendisplusHeartbeat(srcHearbeatKey)
msg = fmt.Sprintf("检查目的集群是否有源集群的心跳数据:%v", srcNodeHearbeat)
mylog.Logger.Info(msg)
if err != nil {
task.Err = err
return task.Err
Expand All @@ -924,7 +926,7 @@ func (task *TendisInsRecoverTask) CheckRollbackResult() error {
return task.Err
}

rollbackDstTime, _ := time.ParseInLocation(consts.UnixtimeLayout, task.RecoveryTimePoint, time.Local)
rollbackDstTime, _ := time.ParseInLocation(consts.UnixtimeLayoutZone, task.RecoveryTimePoint, time.Local)
var hearbeatVal time.Time
var ok bool
var errList []string
Expand All @@ -944,16 +946,16 @@ func (task *TendisInsRecoverTask) CheckRollbackResult() error {
if symbol != "" {
msg = fmt.Sprintf("目的tendisplus:%s 源tendisplus:%s rocksdbid:%d 回档到时间:%s %s 目的时间:%s",
redisAddr, srcRedisAddr, i, symbol,
hearbeatVal.Local().Format(consts.UnixtimeLayout),
rollbackDstTime.Local().Format(consts.UnixtimeLayout))
hearbeatVal.Local().Format(consts.UnixtimeLayoutZone),
rollbackDstTime.Local().Format(consts.UnixtimeLayoutZone))
errList = append(errList)
mylog.Logger.Error(msg)
continue
}
msg = fmt.Sprintf("目的tendisplus:%s 源tendisplus:%s rocksdbid:%d 回档到时间:%s =~ 目的时间:%s",
redisAddr, srcRedisAddr, i,
hearbeatVal.Local().Format(consts.UnixtimeLayout),
rollbackDstTime.Local().Format(consts.UnixtimeLayout))
hearbeatVal.Local().Format(consts.UnixtimeLayoutZone),
rollbackDstTime.Local().Format(consts.UnixtimeLayoutZone))
mylog.Logger.Info(msg)
}
if len(errList) > 0 {
Expand All @@ -972,8 +974,7 @@ func (task *TendisInsRecoverTask) SSDPullIncrbackup() {
task.runtime.Logger.Info("Source fileName:%s,DstAddr:%s", fileName, redisAddr)
// 节点维度增备信息:fileName 过滤,task.SourceIP 备份的源IP

layout := "2006-01-02 15:04:05"
rbDstTime, _ := time.ParseInLocation(layout, task.RecoveryTimePoint, time.Local)
rbDstTime, _ := time.ParseInLocation(consts.UnixtimeLayoutZone, task.RecoveryTimePoint, time.Local)
// 回档目标时间 比 用户填写的时间多1秒
// (因为binlog_tool的--end-datetime参数,--end-datetime这个时间点的binlog是不会被应用的)
rbDstTime = rbDstTime.Add(1 * time.Second)
Expand All @@ -983,8 +984,9 @@ func (task *TendisInsRecoverTask) SSDPullIncrbackup() {
// startTime 拉取增备的开始时间 -> 全备份的开始时间
// endTime 拉取增备份的结束时间 -> 回档时间
ssdIncrBackup := NewTredisRocksDBIncrBack(fileName, task.SourceIP, task.FullBackup.ResultFullbackup[0].StartPos,
task.FullBackup.ResultFullbackup[0].BackupStart.Local().Format(layout),
rbDstTime.Local().Format(layout), task.NeWTempIP, task.RecoverDir, task.RecoverDir, task.RecoveryTimePoint)
task.FullBackup.ResultFullbackup[0].BackupStart.Local().Format(consts.UnixtimeLayoutZone),
rbDstTime.Local().Format(consts.UnixtimeLayoutZone), task.NeWTempIP,
task.RecoverDir, task.RecoverDir, task.RecoveryTimePoint)
if ssdIncrBackup.Err != nil {
task.Err = ssdIncrBackup.Err
return
Expand Down
Loading

0 comments on commit d32dd02

Please sign in to comment.