Skip to content

Commit

Permalink
fix(redis): 备份上传失败但是flow认为成功问题 TencentBlueKing#7899
Browse files Browse the repository at this point in the history
  • Loading branch information
OMG-By committed Nov 14, 2024
1 parent 67a834e commit 37990c5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,50 +164,52 @@ func (job *RedisBackup) Run() (err error) {

bakTasks = append(bakTasks, task)
}
// 串行备份
// 串行触发备份和上传备份系统任务
var bakTaskIDs []string
for _, task := range bakTasks {
bakTask := task
bakTask.GoFullBakcup()
if bakTask.Err != nil {
return bakTask.Err
}
bakTaskIDs = append(bakTaskIDs, bakTask.BackupTaskID)
}

// 上下文输出内容
job.runtime.PipeContextData = bakTasks

// if job.params.WithoutToBackupSys {
// job.runtime.Logger.Info("not transfer to backup system")
// return
// }
// // 并行上传
// wg := sync.WaitGroup{}
// genChan := make(chan *BackupTask)
// limit := 3 // 并发度3
// for worker := 0; worker < limit; worker++ {
// wg.Add(1)
// go func() {
// defer wg.Done()
// for taskItem := range genChan {
// taskItem.TransferToBackupSystem()
// }
// }()
// }
// go func() {
// // 关闭genChan,以便让所有goroutine退出
// defer close(genChan)
// for _, task := range bakTasks {
// bakTask := task
// genChan <- bakTask
// }
// }()
// wg.Wait()
// for _, task := range bakTasks {
// bakTask := task
// if bakTask.Err != nil {
// return bakTask.Err
// }
// }
// 如果是永久备份,那么需要检查上传结果情况
// 结束时间依赖上传时间最长的那个,所以没必要去并发探测
if job.params.BackupType == consts.ForeverBackupType {
job.runtime.Logger.Info(fmt.Sprintf("backup type is[%s], check upload status, task ids [%+v]",
job.params.BackupType, bakTaskIDs))
for _, taskId := range bakTaskIDs {
// 半个小时还没上传成功则认为失败了
for i := 0; i < 60; i++ {
// taskStatus>4,上传失败;
// taskStatus==4,上传成功;
// taskStatus<4,上传中;
taskStatus, statusMsg, err := job.backupClient.TaskStatus(taskId)
// 查询接口失败
if err != nil {
job.runtime.Logger.Error(fmt.Sprintf("taskid(%+v) upload error, err:%+v", taskId, err))
break
}
if taskStatus > 4 {
job.runtime.Logger.Error(fmt.Sprintf("上传失败,statusCode:%d,err:%s", taskStatus, statusMsg))
break
} else if taskStatus < 4 {
job.runtime.Logger.Info(fmt.Sprintf("taskid(%+v)上传中.... statusCode:%d", taskId, taskStatus))
// 每30s去探测一次
time.Sleep(30 * time.Second)
continue
} else if taskStatus == 4 {
job.runtime.Logger.Info(fmt.Sprintf("taskid(%+v)上传成功", taskId))
break
}
}
}
}

return nil
}
Expand Down Expand Up @@ -851,13 +853,13 @@ func (task *BackupTask) TransferToBackupSystem() {
var msg string
cliFileInfo, err := os.Stat(consts.COSBackupClient)
if err != nil {
err = fmt.Errorf("os.stat(%s) failed,err:%v", consts.COSBackupClient, err)
mylog.Logger.Error(err.Error())
task.Err = fmt.Errorf("os.stat(%s) failed,err:%v", consts.COSBackupClient, err)
mylog.Logger.Error(task.Err.Error())
return
}
if !util.IsExecOther(cliFileInfo.Mode().Perm()) {
err = fmt.Errorf("%s is unable to execute by other", consts.COSBackupClient)
mylog.Logger.Error(err.Error())
task.Err = fmt.Errorf("%s is unable to execute by other", consts.COSBackupClient)
mylog.Logger.Error(task.Err.Error())
return
}
mylog.Logger.Info(fmt.Sprintf("redis(%s) backupFiles:%+v start upload backupSystem", task.Addr(), task.BackupFile))
Expand All @@ -873,7 +875,9 @@ func (task *BackupTask) TransferToBackupSystem() {

// BackupRecordSaveToLocalDB 备份信息记录到本地sqlite中
func (task *BackupTask) BackupRecordSaveToLocalDB() {
if task.sqdb == nil {
mylog.Logger.Info(fmt.Sprintf("(%s)备份记录记录到本地sqlite中....", task.Addr()))
if task.sqdb == nil || task.Err != nil {
mylog.Logger.Info(fmt.Sprintf("(%s)sqlite为空或者task存在err,跳过本地记录", task.Addr()))
return
}
task.RedisFullbackupHistorySchema.ID = 0 // 重置为0,以便gorm自增
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (job *RedisShutdown) BackupDir(port int) {
}
job.runtime.Logger.Info("redis port[%d] backup dir to doing....", port)
mvCmd := fmt.Sprintf("mv %s/%d %s/shutdown_%d_%s", job.RealDataDir, port,
job.RedisBackupDir, port, time.Now().Format("20060102150405"))
job.RealDataDir, port, time.Now().Format("20060102150405"))
job.runtime.Logger.Info(mvCmd)
cmd := []string{"su", consts.MysqlAaccount, "-c", mvCmd}
_, err := util.RunLocalCmd(cmd[0], cmd[1:], "",
Expand Down

0 comments on commit 37990c5

Please sign in to comment.