From 863ff09c02ecf32c8ec5a350c809b00b81ecfe5c Mon Sep 17 00:00:00 2001 From: nolouch Date: Sun, 28 Apr 2024 15:47:48 +0800 Subject: [PATCH] ratelimit: add pending task metrics in runner Signed-off-by: nolouch --- pkg/ratelimit/metrics.go | 7 +++++-- pkg/ratelimit/runner.go | 7 +++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 3c5020554a87..5d4443a1cc4a 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -18,7 +18,10 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const nameStr = "runner_name" +const ( + nameStr = "runner_name" + taskStr = "task_type" +) var ( RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( @@ -35,7 +38,7 @@ var ( Subsystem: "ratelimit", Name: "runner_task_pending_tasks", Help: "The number of pending tasks in the runner.", - }, []string{nameStr}) + }, []string{nameStr, taskStr}) RunnerTaskFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index c4f2d5bc5ac6..2d33b3f12ac8 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -54,6 +54,7 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup + pendingTaskCount map[string]int64 failedTaskCount prometheus.Counter maxWaitingDuration prometheus.Gauge } @@ -66,6 +67,7 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), + pendingTaskCount: make(map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s @@ -109,6 +111,9 @@ func (s *ConcurrentRunner) Start() { if len(s.pendingTasks) > 0 { maxDuration = time.Since(s.pendingTasks[0].submittedAt) } + for name, cnt := range s.pendingTaskCount { + RunnerTaskPendingTasks.WithLabelValues(s.name, name).Set(float64(cnt)) + } s.pendingMu.Unlock() s.maxWaitingDuration.Set(maxDuration.Seconds()) } @@ -132,6 +137,7 @@ func (s *ConcurrentRunner) processPendingTasks() { select { case s.taskChan <- task: s.pendingTasks = s.pendingTasks[1:] + s.pendingTaskCount[task.Opts.TaskName]-- return default: return @@ -167,6 +173,7 @@ func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(con } task.submittedAt = time.Now() s.pendingTasks = append(s.pendingTasks, task) + s.pendingTaskCount[opt.TaskName]++ } return nil }