Skip to content

Commit

Permalink
fix(redis): redis_dts设置自动清理180天前已完成任务的目录 TencentBlueKing#7232
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemakeit committed Oct 10, 2024
1 parent adf657a commit 9277d6d
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
13 changes: 13 additions & 0 deletions dbm-services/redis/redis-dts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"sync"

"dbm-services/redis/redis-dts/config"
"dbm-services/redis/redis-dts/pkg/cleanOldTasksDir"
"dbm-services/redis/redis-dts/pkg/constvar"
"dbm-services/redis/redis-dts/pkg/dtsJob"
"dbm-services/redis/redis-dts/pkg/osPerf"
"dbm-services/redis/redis-dts/tclog"
"dbm-services/redis/redis-dts/util"

"github.com/robfig/cron/v3"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -70,6 +72,17 @@ func main() {
osPerf.WatchDtsSvrPerf()
}()

// 添加定时任务,每天凌晨2点清理180天以前的job目录
c := cron.New(
cron.WithLogger(tclog.GlobCronLogger),
)
_, err = c.AddJob("0 2 * * *", cron.NewChain(cron.SkipIfStillRunning(tclog.GlobCronLogger)).
Then(cleanOldTasksDir.GetGlobCleanOldTasksDir()))
if err != nil {
log.Fatal(err)
}
c.Start()

jobers := make([]dtsJob.DtsJober, 0, 3)
jobers = append(jobers, dtsJob.NewTendisSSDDtsJob(constvar.GetBkCloudID(), localIP,
constvar.GetZoneName(), tclog.Logger, wg))
Expand Down
86 changes: 86 additions & 0 deletions dbm-services/redis/redis-dts/pkg/cleanOldTasksDir/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Package cleanOldTasksDir TODO
package cleanOldTasksDir

import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

"dbm-services/redis/redis-dts/tclog"
"dbm-services/redis/redis-dts/util"
)

var globCleanOldTasksDir *Job
var cleanOnce sync.Once

// Job TODO
type Job struct {
}

// GetGlobCleanOldTasksDir get globCleanOldTasksDir
func GetGlobCleanOldTasksDir() *Job {
cleanOnce.Do(func() {
globCleanOldTasksDir = &Job{}
})
return globCleanOldTasksDir
}

// Run 运行
func (job *Job) Run() {
mydir, err := util.CurrentExecutePath()
if err != nil {
tclog.Logger.Error(err.Error())
return
}
taskDir := filepath.Join(mydir, "tasks")

// 180天前的时间
threshold := time.Now().Local().AddDate(0, 0, -180)

// 读取目录内容
entries, err := os.ReadDir(taskDir)
if err != nil {
tclog.Logger.Error(fmt.Sprintf("os.ReadDir() %s return err:%v\n", taskDir, err))
return
}
for _, entry := range entries {
if entry.IsDir() {
// 获取目录完整路径
dirPath := filepath.Join(taskDir, entry.Name())

// 获取目录的文件信息
dirInfo, err := os.Stat(dirPath)
if err != nil {
tclog.Logger.Error(fmt.Sprintf("os.Stat() %s return err:%v\n", dirPath, err))
continue
}
// 判断是否是60天前的目录
if dirInfo.ModTime().Before(threshold) {
if job.isDtsTasksRunning(entry.Name()) {
tclog.Logger.Info(fmt.Sprintf("目录:%s 180天前创建的,但是依然有tasks还在运行,跳过删除", entry.Name()))
continue
}
// 删除目录
rmCmd := fmt.Sprintf("cd %s && rm -rf %s", taskDir, entry.Name())
tclog.Logger.Info(fmt.Sprintf("删除180天前创建的目录:%s", rmCmd))
util.RunLocalCmd("bash", []string{"-c", rmCmd}, "", nil, 1*time.Hour, tclog.Logger)
}
}
}
}

func (job *Job) isDtsTasksRunning(taskSubDirName string) bool {
psCmd := fmt.Sprintf("ps -ef | grep %s | grep -v grep | wc -l", taskSubDirName)
ret, err := util.RunLocalCmd("bash", []string{"-c", psCmd}, "", nil, 1*time.Hour, tclog.Logger)
if err != nil {
return false
}
ret = strings.TrimSpace(ret)
if ret == "0" {
return false
}
return true
}
59 changes: 59 additions & 0 deletions dbm-services/redis/redis-dts/tclog/tclog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
package tclog

import (
"fmt"
"strings"
"time"

"github.com/robfig/cron/v3"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -13,6 +16,9 @@ import (
// Logger is a global log descripter
var Logger *zap.Logger

// GlobCronLogger 适配robfig/cron logger
var GlobCronLogger *cronLogAdapter

// timeEncoder format log time
func timeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString(t.Format("2006-01-02 15:04:05.000 -07:00"))
Expand Down Expand Up @@ -91,6 +97,9 @@ func InitMainlog() {
// zapcore.NewCore(zapcore.NewJSONEncoder(newEncoderConfig()), zapcore.AddSync(os.Stdout), level),
)
Logger = zap.New(core, zap.AddCaller())

GlobCronLogger = &cronLogAdapter{}
GlobCronLogger.Logger = Logger
}

// NewFileLogger 新建一个logger
Expand All @@ -115,3 +124,53 @@ func NewFileLogger(logFile string) *zap.Logger {
)
return zap.New(core, zap.AddCaller())
}

// 无实际作用,仅确保实现了 cron.Logger 接口
var _ cron.Logger = (*cronLogAdapter)(nil)

// cronLogAdapter 适配器,目标兼容 go.uber.org/zap.Logger 和 robfig/cron.Logger的接口
type cronLogAdapter struct {
*zap.Logger
}

// Error error
func (l *cronLogAdapter) Error(err error, msg string, keysAndValues ...interface{}) {
keysAndValues = formatTimes(keysAndValues)
l.Error(err, fmt.Sprintf(formatString(len(keysAndValues)+2), append([]interface{}{msg, "error", err},
keysAndValues...)...))
}

// Info info
func (l *cronLogAdapter) Info(msg string, keysAndValues ...interface{}) {
keysAndValues = formatTimes(keysAndValues)
l.Logger.Info(fmt.Sprintf(formatString(len(keysAndValues)), append([]interface{}{msg}, keysAndValues...)...))
}

// formatString returns a logfmt-like format string for the number of
// key/values.
func formatString(numKeysAndValues int) string {
var sb strings.Builder
sb.WriteString("%s")
if numKeysAndValues > 0 {
sb.WriteString(", ")
}
for i := 0; i < numKeysAndValues/2; i++ {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("%v=%v")
}
return sb.String()
}

// formatTimes formats any time.Time values as RFC3339.
func formatTimes(keysAndValues []interface{}) []interface{} {
var formattedArgs []interface{}
for _, arg := range keysAndValues {
if t, ok := arg.(time.Time); ok {
arg = t.Format(time.RFC3339)
}
formattedArgs = append(formattedArgs, arg)
}
return formattedArgs
}

0 comments on commit 9277d6d

Please sign in to comment.