From 990454bd24d7907102ea246a0bb604381ae435cd Mon Sep 17 00:00:00 2001 From: Alessandro Degano Date: Thu, 28 Nov 2019 08:58:06 +0100 Subject: [PATCH] Emit tasks queue metric when leaving the queue. This is necessary to report the length of the queues as zero when the metric is not present (e.g. when there are no tasks in a given queue). Signed-off-by: Alessandro Degano --- atc/worker/client.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/atc/worker/client.go b/atc/worker/client.go index 217b7d97bb1..7ac9577adc2 100644 --- a/atc/worker/client.go +++ b/atc/worker/client.go @@ -332,7 +332,7 @@ func (client *client) chooseTaskWorker( workerTags := strings.Join(tags, "_") queue_position, qlen, err := client.taskQueue.FindOrAppend(taskId, workerSpec.Platform, teamId, workerTags, logger) if !alreadyQueued { - defer client.taskQueue.Dequeue(taskId, logger) + defer dequeue(client.taskQueue, taskId, workerSpec.Platform, teamId, workerTags, logger) alreadyQueued = true } if err != nil { @@ -419,7 +419,6 @@ func (client *client) chooseTaskWorker( } } } - break } @@ -434,6 +433,21 @@ func decreaseActiveTasks(logger lager.Logger, w Worker) { } } +func dequeue(queue db.TaskQueue, taskId string, platform string, teamId int, workerTags string, logger lager.Logger) { + queue.Dequeue(taskId, logger) + qlen, err := queue.Length(taskId) + if err != nil { + logger.Error("failed-to-fetch-queue-length", err) + return + } + metric.TaskQueue{ + Length: qlen, + Platform: platform, + Team: teamId, + WorkerTags: workerTags, + }.Emit(logger) +} + type processStatus struct { processStatus int processErr error