Skip to content

Commit

Permalink
ratelimit: add pending task metrics in runner
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Apr 29, 2024
1 parent aae410f commit 863ff09
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
7 changes: 5 additions & 2 deletions pkg/ratelimit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

const nameStr = "runner_name"
const (
nameStr = "runner_name"
taskStr = "task_type"
)

var (
RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec(
Expand All @@ -35,7 +38,7 @@ var (
Subsystem: "ratelimit",
Name: "runner_task_pending_tasks",
Help: "The number of pending tasks in the runner.",
}, []string{nameStr})
}, []string{nameStr, taskStr})
RunnerTaskFailedTasks = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ConcurrentRunner struct {
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
failedTaskCount prometheus.Counter
maxWaitingDuration prometheus.Gauge
}
Expand All @@ -66,6 +67,7 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name),
pendingTaskCount: make(map[string]int64),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand Down Expand Up @@ -109,6 +111,9 @@ func (s *ConcurrentRunner) Start() {
if len(s.pendingTasks) > 0 {
maxDuration = time.Since(s.pendingTasks[0].submittedAt)
}
for name, cnt := range s.pendingTaskCount {
RunnerTaskPendingTasks.WithLabelValues(s.name, name).Set(float64(cnt))
}
s.pendingMu.Unlock()
s.maxWaitingDuration.Set(maxDuration.Seconds())
}
Expand All @@ -132,6 +137,7 @@ func (s *ConcurrentRunner) processPendingTasks() {
select {
case s.taskChan <- task:
s.pendingTasks = s.pendingTasks[1:]
s.pendingTaskCount[task.Opts.TaskName]--
return
default:
return
Expand Down Expand Up @@ -167,6 +173,7 @@ func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(con
}
task.submittedAt = time.Now()
s.pendingTasks = append(s.pendingTasks, task)
s.pendingTaskCount[opt.TaskName]++
}
return nil
}
Expand Down

0 comments on commit 863ff09

Please sign in to comment.