diff --git a/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go b/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go index e5277f5771..c473dff5c7 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/consts/consts.go @@ -3,7 +3,7 @@ package consts // version const ( - BkDbmonVersion = "v0.15" + BkDbmonVersion = "v0.16" ) const ( diff --git a/dbm-services/redis/db-tools/dbmon/pkg/redistaillog/tail_task.go b/dbm-services/redis/db-tools/dbmon/pkg/redistaillog/tail_task.go index 6acb709d36..6d9281b6d4 100644 --- a/dbm-services/redis/db-tools/dbmon/pkg/redistaillog/tail_task.go +++ b/dbm-services/redis/db-tools/dbmon/pkg/redistaillog/tail_task.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path/filepath" + "regexp" "strconv" "strings" "sync" @@ -21,6 +22,20 @@ import ( "dbm-services/redis/db-tools/dbmon/util" ) +const ( + MaxAllowedLogCountPerDuration = 100 +) + +var ( + twemproxyLogStart *regexp.Regexp + logLimiter *util.FixedWindowLimiter +) + +func init() { + twemproxyLogStart = regexp.MustCompile(`^\[\d{4}-\d{2}-\d{2}`) + logLimiter = util.NewFixedWindowLimiter(MaxAllowedLogCountPerDuration, time.Minute) +} + // BaseSchema schema type BaseSchema struct { BkBizID string `json:"bk_biz_id"` @@ -257,6 +272,14 @@ func (task *TailTask) BackgroundTailLog() { if !task.filterLogLine(line.Text) { continue } + // twemproxy resp 格式 多行,,,[2024-11-26 16:45:25.582] + if task.Role == consts.MetaRoleTwemproxy && !twemproxyLogStart.MatchString(line.Text) { + continue + } + // 限流,每分钟 最多允许上报N条日志 + if !logLimiter.TryAcquire() { + continue + } recordItem.CreateTime = line.Time.Local().Format(time.RFC3339) recordItem.Data = line.Text tmpBytes, _ := json.Marshal(recordItem) diff --git a/dbm-services/redis/db-tools/dbmon/util/limiter.go b/dbm-services/redis/db-tools/dbmon/util/limiter.go new file mode 100644 index 0000000000..192c556640 --- /dev/null +++ b/dbm-services/redis/db-tools/dbmon/util/limiter.go @@ -0,0 +1,42 @@ +package util + +import ( + "sync" + "time" +) + +// FixedWindowLimiter 固定窗口限流器 +type FixedWindowLimiter struct { + limit int // 窗口请求上限 + window time.Duration // 窗口时间大小 + counter int // 计数器 + lastTime time.Time // 上一次请求的时间 + mutex sync.Mutex // 避免并发问题 +} + +func NewFixedWindowLimiter(limit int, window time.Duration) *FixedWindowLimiter { + return &FixedWindowLimiter{ + limit: limit, + window: window, + lastTime: time.Now(), + } +} + +func (l *FixedWindowLimiter) TryAcquire() bool { + l.mutex.Lock() + defer l.mutex.Unlock() + // 获取当前时间 + now := time.Now() + // 如果当前窗口失效,计数器清0,开启新的窗口 + if now.Sub(l.lastTime) > l.window { + l.counter = 0 + l.lastTime = now + } + // 若到达窗口请求上限,请求失败 + if l.counter >= l.limit { + return false + } + // 若没到窗口请求上限,计数器+1,请求成功 + l.counter++ + return true +}