Skip to content

Commit

Permalink
Emit tasks queue metric when leaving the queue.
Browse files Browse the repository at this point in the history
This is necessary to report the length of the queues
as zero when the metric is not present (e.g. when there
are no tasks in a given queue).

Signed-off-by: Alessandro Degano <[email protected]>
  • Loading branch information
Alessandro Degano committed Nov 29, 2019
1 parent bebda25 commit 990454b
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions atc/worker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (client *client) chooseTaskWorker(
workerTags := strings.Join(tags, "_")
queue_position, qlen, err := client.taskQueue.FindOrAppend(taskId, workerSpec.Platform, teamId, workerTags, logger)
if !alreadyQueued {
defer client.taskQueue.Dequeue(taskId, logger)
defer dequeue(client.taskQueue, taskId, workerSpec.Platform, teamId, workerTags, logger)
alreadyQueued = true
}
if err != nil {
Expand Down Expand Up @@ -419,7 +419,6 @@ func (client *client) chooseTaskWorker(
}
}
}

break
}

Expand All @@ -434,6 +433,21 @@ func decreaseActiveTasks(logger lager.Logger, w Worker) {
}
}

func dequeue(queue db.TaskQueue, taskId string, platform string, teamId int, workerTags string, logger lager.Logger) {
queue.Dequeue(taskId, logger)
qlen, err := queue.Length(taskId)
if err != nil {
logger.Error("failed-to-fetch-queue-length", err)
return
}
metric.TaskQueue{
Length: qlen,
Platform: platform,
Team: teamId,
WorkerTags: workerTags,
}.Emit(logger)
}

type processStatus struct {
processStatus int
processErr error
Expand Down

0 comments on commit 990454b

Please sign in to comment.