diff --git a/dbm-services/redis/redis-dts/main.go b/dbm-services/redis/redis-dts/main.go index 092a589eea..12b410d35a 100644 --- a/dbm-services/redis/redis-dts/main.go +++ b/dbm-services/redis/redis-dts/main.go @@ -9,12 +9,14 @@ import ( "sync" "dbm-services/redis/redis-dts/config" + "dbm-services/redis/redis-dts/pkg/cleanOldTasksDir" "dbm-services/redis/redis-dts/pkg/constvar" "dbm-services/redis/redis-dts/pkg/dtsJob" "dbm-services/redis/redis-dts/pkg/osPerf" "dbm-services/redis/redis-dts/tclog" "dbm-services/redis/redis-dts/util" + "github.com/robfig/cron/v3" "github.com/spf13/viper" ) @@ -70,6 +72,17 @@ func main() { osPerf.WatchDtsSvrPerf() }() + // 添加定时任务,每天凌晨2点清理180天以前的job目录 + c := cron.New( + cron.WithLogger(tclog.GlobCronLogger), + ) + _, err = c.AddJob("0 2 * * *", cron.NewChain(cron.SkipIfStillRunning(tclog.GlobCronLogger)). + Then(cleanOldTasksDir.GetGlobCleanOldTasksDir())) + if err != nil { + log.Fatal(err) + } + c.Start() + jobers := make([]dtsJob.DtsJober, 0, 3) jobers = append(jobers, dtsJob.NewTendisSSDDtsJob(constvar.GetBkCloudID(), localIP, constvar.GetZoneName(), tclog.Logger, wg)) diff --git a/dbm-services/redis/redis-dts/pkg/cleanOldTasksDir/job.go b/dbm-services/redis/redis-dts/pkg/cleanOldTasksDir/job.go new file mode 100644 index 0000000000..0ceffb0f04 --- /dev/null +++ b/dbm-services/redis/redis-dts/pkg/cleanOldTasksDir/job.go @@ -0,0 +1,86 @@ +// Package cleanOldTasksDir TODO +package cleanOldTasksDir + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "dbm-services/redis/redis-dts/tclog" + "dbm-services/redis/redis-dts/util" +) + +var globCleanOldTasksDir *Job +var cleanOnce sync.Once + +// Job TODO +type Job struct { +} + +// GetGlobCleanOldTasksDir get globCleanOldTasksDir +func GetGlobCleanOldTasksDir() *Job { + cleanOnce.Do(func() { + globCleanOldTasksDir = &Job{} + }) + return globCleanOldTasksDir +} + +// Run 运行 +func (job *Job) Run() { + mydir, err := util.CurrentExecutePath() + if err != nil { + tclog.Logger.Error(err.Error()) + return + } + taskDir := filepath.Join(mydir, "tasks") + + // 180天前的时间 + threshold := time.Now().Local().AddDate(0, 0, -180) + + // 读取目录内容 + entries, err := os.ReadDir(taskDir) + if err != nil { + tclog.Logger.Error(fmt.Sprintf("os.ReadDir() %s return err:%v\n", taskDir, err)) + return + } + for _, entry := range entries { + if entry.IsDir() { + // 获取目录完整路径 + dirPath := filepath.Join(taskDir, entry.Name()) + + // 获取目录的文件信息 + dirInfo, err := os.Stat(dirPath) + if err != nil { + tclog.Logger.Error(fmt.Sprintf("os.Stat() %s return err:%v\n", dirPath, err)) + continue + } + // 判断是否是60天前的目录 + if dirInfo.ModTime().Before(threshold) { + if job.isDtsTasksRunning(entry.Name()) { + tclog.Logger.Info(fmt.Sprintf("目录:%s 180天前创建的,但是依然有tasks还在运行,跳过删除", entry.Name())) + continue + } + // 删除目录 + rmCmd := fmt.Sprintf("cd %s && rm -rf %s", taskDir, entry.Name()) + tclog.Logger.Info(fmt.Sprintf("删除180天前创建的目录:%s", rmCmd)) + util.RunLocalCmd("bash", []string{"-c", rmCmd}, "", nil, 1*time.Hour, tclog.Logger) + } + } + } +} + +func (job *Job) isDtsTasksRunning(taskSubDirName string) bool { + psCmd := fmt.Sprintf("ps -ef | grep %s | grep -v grep | wc -l", taskSubDirName) + ret, err := util.RunLocalCmd("bash", []string{"-c", psCmd}, "", nil, 1*time.Hour, tclog.Logger) + if err != nil { + return false + } + ret = strings.TrimSpace(ret) + if ret == "0" { + return false + } + return true +} diff --git a/dbm-services/redis/redis-dts/tclog/tclog.go b/dbm-services/redis/redis-dts/tclog/tclog.go index 1dd1db42d9..78b5190cf8 100644 --- a/dbm-services/redis/redis-dts/tclog/tclog.go +++ b/dbm-services/redis/redis-dts/tclog/tclog.go @@ -2,8 +2,11 @@ package tclog import ( + "fmt" + "strings" "time" + "github.com/robfig/cron/v3" "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -13,6 +16,9 @@ import ( // Logger is a global log descripter var Logger *zap.Logger +// GlobCronLogger 适配robfig/cron logger +var GlobCronLogger *cronLogAdapter + // timeEncoder format log time func timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { enc.AppendString(t.Format("2006-01-02 15:04:05.000 -07:00")) @@ -91,6 +97,9 @@ func InitMainlog() { // zapcore.NewCore(zapcore.NewJSONEncoder(newEncoderConfig()), zapcore.AddSync(os.Stdout), level), ) Logger = zap.New(core, zap.AddCaller()) + + GlobCronLogger = &cronLogAdapter{} + GlobCronLogger.Logger = Logger } // NewFileLogger 新建一个logger @@ -115,3 +124,53 @@ func NewFileLogger(logFile string) *zap.Logger { ) return zap.New(core, zap.AddCaller()) } + +// 无实际作用,仅确保实现了 cron.Logger 接口 +var _ cron.Logger = (*cronLogAdapter)(nil) + +// cronLogAdapter 适配器,目标兼容 go.uber.org/zap.Logger 和 robfig/cron.Logger的接口 +type cronLogAdapter struct { + *zap.Logger +} + +// Error error +func (l *cronLogAdapter) Error(err error, msg string, keysAndValues ...interface{}) { + keysAndValues = formatTimes(keysAndValues) + l.Error(err, fmt.Sprintf(formatString(len(keysAndValues)+2), append([]interface{}{msg, "error", err}, + keysAndValues...)...)) +} + +// Info info +func (l *cronLogAdapter) Info(msg string, keysAndValues ...interface{}) { + keysAndValues = formatTimes(keysAndValues) + l.Logger.Info(fmt.Sprintf(formatString(len(keysAndValues)), append([]interface{}{msg}, keysAndValues...)...)) +} + +// formatString returns a logfmt-like format string for the number of +// key/values. +func formatString(numKeysAndValues int) string { + var sb strings.Builder + sb.WriteString("%s") + if numKeysAndValues > 0 { + sb.WriteString(", ") + } + for i := 0; i < numKeysAndValues/2; i++ { + if i > 0 { + sb.WriteString(", ") + } + sb.WriteString("%v=%v") + } + return sb.String() +} + +// formatTimes formats any time.Time values as RFC3339. +func formatTimes(keysAndValues []interface{}) []interface{} { + var formattedArgs []interface{} + for _, arg := range keysAndValues { + if t, ok := arg.(time.Time); ok { + arg = t.Format(time.RFC3339) + } + formattedArgs = append(formattedArgs, arg) + } + return formattedArgs +}