From 37990c5968831aa04ef1c7a72a0d78619520a127 Mon Sep 17 00:00:00 2001 From: OMG-By <504094596@qq.com> Date: Thu, 14 Nov 2024 15:29:28 +0800 Subject: [PATCH] =?UTF-8?q?fix(redis):=20=E5=A4=87=E4=BB=BD=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0=E5=A4=B1=E8=B4=A5=E4=BD=86=E6=98=AFflow=E8=AE=A4?= =?UTF-8?q?=E4=B8=BA=E6=88=90=E5=8A=9F=E9=97=AE=E9=A2=98=20#7899?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/atomjobs/atomredis/redis_backup.go | 80 ++++++++++--------- .../pkg/atomjobs/atomredis/redis_shutdown.go | 2 +- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go index daa8ff1d29..b25b0cc3f2 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_backup.go @@ -164,50 +164,52 @@ func (job *RedisBackup) Run() (err error) { bakTasks = append(bakTasks, task) } - // 串行备份 + // 串行触发备份和上传备份系统任务 + var bakTaskIDs []string for _, task := range bakTasks { bakTask := task bakTask.GoFullBakcup() if bakTask.Err != nil { return bakTask.Err } + bakTaskIDs = append(bakTaskIDs, bakTask.BackupTaskID) } // 上下文输出内容 job.runtime.PipeContextData = bakTasks - // if job.params.WithoutToBackupSys { - // job.runtime.Logger.Info("not transfer to backup system") - // return - // } - // // 并行上传 - // wg := sync.WaitGroup{} - // genChan := make(chan *BackupTask) - // limit := 3 // 并发度3 - // for worker := 0; worker < limit; worker++ { - // wg.Add(1) - // go func() { - // defer wg.Done() - // for taskItem := range genChan { - // taskItem.TransferToBackupSystem() - // } - // }() - // } - // go func() { - // // 关闭genChan,以便让所有goroutine退出 - // defer close(genChan) - // for _, task := range bakTasks { - // bakTask := task - // genChan <- bakTask - // } - // }() - // wg.Wait() - // for _, task := range bakTasks { - // bakTask := task - // if bakTask.Err != nil { - // return bakTask.Err - // } - // } + // 如果是永久备份,那么需要检查上传结果情况 + // 结束时间依赖上传时间最长的那个,所以没必要去并发探测 + if job.params.BackupType == consts.ForeverBackupType { + job.runtime.Logger.Info(fmt.Sprintf("backup type is[%s], check upload status, task ids [%+v]", + job.params.BackupType, bakTaskIDs)) + for _, taskId := range bakTaskIDs { + // 半个小时还没上传成功则认为失败了 + for i := 0; i < 60; i++ { + // taskStatus>4,上传失败; + // taskStatus==4,上传成功; + // taskStatus<4,上传中; + taskStatus, statusMsg, err := job.backupClient.TaskStatus(taskId) + // 查询接口失败 + if err != nil { + job.runtime.Logger.Error(fmt.Sprintf("taskid(%+v) upload error, err:%+v", taskId, err)) + break + } + if taskStatus > 4 { + job.runtime.Logger.Error(fmt.Sprintf("上传失败,statusCode:%d,err:%s", taskStatus, statusMsg)) + break + } else if taskStatus < 4 { + job.runtime.Logger.Info(fmt.Sprintf("taskid(%+v)上传中.... statusCode:%d", taskId, taskStatus)) + // 每30s去探测一次 + time.Sleep(30 * time.Second) + continue + } else if taskStatus == 4 { + job.runtime.Logger.Info(fmt.Sprintf("taskid(%+v)上传成功", taskId)) + break + } + } + } + } return nil } @@ -851,13 +853,13 @@ func (task *BackupTask) TransferToBackupSystem() { var msg string cliFileInfo, err := os.Stat(consts.COSBackupClient) if err != nil { - err = fmt.Errorf("os.stat(%s) failed,err:%v", consts.COSBackupClient, err) - mylog.Logger.Error(err.Error()) + task.Err = fmt.Errorf("os.stat(%s) failed,err:%v", consts.COSBackupClient, err) + mylog.Logger.Error(task.Err.Error()) return } if !util.IsExecOther(cliFileInfo.Mode().Perm()) { - err = fmt.Errorf("%s is unable to execute by other", consts.COSBackupClient) - mylog.Logger.Error(err.Error()) + task.Err = fmt.Errorf("%s is unable to execute by other", consts.COSBackupClient) + mylog.Logger.Error(task.Err.Error()) return } mylog.Logger.Info(fmt.Sprintf("redis(%s) backupFiles:%+v start upload backupSystem", task.Addr(), task.BackupFile)) @@ -873,7 +875,9 @@ func (task *BackupTask) TransferToBackupSystem() { // BackupRecordSaveToLocalDB 备份信息记录到本地sqlite中 func (task *BackupTask) BackupRecordSaveToLocalDB() { - if task.sqdb == nil { + mylog.Logger.Info(fmt.Sprintf("(%s)备份记录记录到本地sqlite中....", task.Addr())) + if task.sqdb == nil || task.Err != nil { + mylog.Logger.Info(fmt.Sprintf("(%s)sqlite为空或者task存在err,跳过本地记录", task.Addr())) return } task.RedisFullbackupHistorySchema.ID = 0 // 重置为0,以便gorm自增 diff --git a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_shutdown.go b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_shutdown.go index ada0c5816d..0b18c2ce1a 100644 --- a/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_shutdown.go +++ b/dbm-services/redis/db-tools/dbactuator/pkg/atomjobs/atomredis/redis_shutdown.go @@ -240,7 +240,7 @@ func (job *RedisShutdown) BackupDir(port int) { } job.runtime.Logger.Info("redis port[%d] backup dir to doing....", port) mvCmd := fmt.Sprintf("mv %s/%d %s/shutdown_%d_%s", job.RealDataDir, port, - job.RedisBackupDir, port, time.Now().Format("20060102150405")) + job.RealDataDir, port, time.Now().Format("20060102150405")) job.runtime.Logger.Info(mvCmd) cmd := []string{"su", consts.MysqlAaccount, "-c", mvCmd} _, err := util.RunLocalCmd(cmd[0], cmd[1:], "",