Skip to content

Commit

Permalink
Merge pull request #6 from Pix4D/style_task_queue
Browse files Browse the repository at this point in the history
Improve sytle of task_queue.go.
  • Loading branch information
aledegano authored May 7, 2020
2 parents bed5fda + d1c239a commit 5b744f2
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 29 deletions.
62 changes: 35 additions & 27 deletions atc/db/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package db
import (
"database/sql"
"fmt"
"math"
"time"

"code.cloudfoundry.org/lager"
Expand All @@ -12,6 +11,8 @@ import (

//go:generate counterfeiter . TaskQueue

// A TaskQueue handles multiple queues for the tasks in the database.
// It is used along the placement strategy: limit-active-tasks
type TaskQueue interface {
FindOrAppend(string, string, int, string, lager.Logger) (int, int, error)
FindQueue(string) (string, int, string, error)
Expand All @@ -25,24 +26,26 @@ type taskQueue struct {
conn Conn
}

// NewTaskQueue initializes a queue with a database connection
func NewTaskQueue(conn Conn) TaskQueue {
return &taskQueue{
conn: conn,
}
}

func (queue *taskQueue) FindOrAppend(id string, platform string, teamId int, workerTag string, logger lager.Logger) (position int, length int, err error) {
func (queue *taskQueue) FindOrAppend(id string, platform string, teamID int, workerTag string, logger lager.Logger) (int, int, error) {
var length int
// Returns the position and the total queue length for a given id
exPlatform, exTeamId, exWorkerTag, err := queue.FindQueue(id)
exPlatform, exTeamID, exWorkerTag, err := queue.FindQueue(id)
if err != nil && err != sql.ErrNoRows {
return 0, 0, err
}
// Check that the id is not already present in a different queue, remove it from that queue in that case
if exPlatform != platform || exTeamId != teamId || exWorkerTag != workerTag {
if exPlatform != platform || exTeamID != teamID || exWorkerTag != workerTag {
logger.Info(fmt.Sprintf("%s.already-present-in-different-queue", id))
queue.Dequeue(id, logger)
}
position, err = queue.Position(id)
position, err := queue.Position(id)
if err != nil {
return 0, 0, err
}
Expand All @@ -53,7 +56,7 @@ func (queue *taskQueue) FindOrAppend(id string, platform string, teamId int, wor
}
} else { // Append to the queue, then check its position and total queue length
_, err := psql.Insert("tasks_queue").
Values(id, platform, teamId, workerTag, sq.Expr("now()")).
Values(id, platform, teamID, workerTag, sq.Expr("now()")).
RunWith(queue.conn).
Exec()
if err != nil {
Expand Down Expand Up @@ -82,23 +85,23 @@ func (queue *taskQueue) Dequeue(id string, logger lager.Logger) {
}
}

func (queue *taskQueue) Position(id string) (position int, err error) {
func (queue *taskQueue) Position(id string) (int, error) {
// Return 0 if the id is not present,
// its position if found, where 1 is the front of the queue,
// an error in any other case.
platform, teamId, workerTag, err := queue.FindQueue(id)
var position int
platform, teamID, workerTag, err := queue.FindQueue(id)
if err != nil {
if err == sql.ErrNoRows {
return 0, nil
} else {
return 0, err
}
return 0, err
}
tasks_positions := psql.Select("row_number() over (order by insert_time), id").
tasksPositions := psql.Select("row_number() over (order by insert_time), id").
From("tasks_queue").
Where(sq.Eq{"platform": platform, "team_id": teamId, "worker_tag": workerTag})
Where(sq.Eq{"platform": platform, "team_id": teamID, "worker_tag": workerTag})
err = psql.Select("row_number").
FromSelect(tasks_positions, "subq").
FromSelect(tasksPositions, "subq").
Where(sq.Eq{"id": id}).
RunWith(queue.conn).
QueryRow().
Expand All @@ -109,31 +112,36 @@ func (queue *taskQueue) Position(id string) (position int, err error) {
return position, nil
}

func (queue *taskQueue) FindQueue(id string) (platform string, teamId int, workerTag string, err error) {
err = psql.Select("platform, team_id, worker_tag").
func (queue *taskQueue) FindQueue(id string) (string, int, string, error) {
var (
platform string
teamID int
workerTag string
)
err := psql.Select("platform, team_id, worker_tag").
From("tasks_queue").
Where(sq.Eq{"id": id}).
RunWith(queue.conn).
QueryRow().
Scan(&platform, &teamId, &workerTag)
Scan(&platform, &teamID, &workerTag)
if err != nil {
return "", 0, "", err
}
return platform, teamId, workerTag, nil
return platform, teamID, workerTag, nil
}

func (queue *taskQueue) Length(id string) (length int, err error) {
platform, teamId, workerTag, err := queue.FindQueue(id)
func (queue *taskQueue) Length(id string) (int, error) {
var length int
platform, teamID, workerTag, err := queue.FindQueue(id)
if err != nil {
if err == sql.ErrNoRows {
return 0, nil
} else {
return 0, err
}
return 0, err
}
err = psql.Select("count(*)").
From("tasks_queue").
Where(sq.Eq{"platform": platform, "team_id": teamId, "worker_tag": workerTag}).
Where(sq.Eq{"platform": platform, "team_id": teamID, "worker_tag": workerTag}).
RunWith(queue.conn).
QueryRow().
Scan(&length)
Expand All @@ -143,17 +151,17 @@ func (queue *taskQueue) Length(id string) (length int, err error) {
return length, nil
}

func (queue *taskQueue) Elapsed(id string) (duration int, err error) {
var insert_time time.Time
err = psql.Select("insert_time").
func (queue *taskQueue) Elapsed(id string) (int, error) {
var insertTime time.Time
err := psql.Select("insert_time").
From("tasks_queue").
Where(sq.Eq{"id": id}).
RunWith(queue.conn).
QueryRow().
Scan(&insert_time)
Scan(&insertTime)
if err != nil {
return 0, err
}
duration = int(math.Round(time.Now().Sub(insert_time).Seconds()))
duration := int(time.Since(insertTime).Seconds()) + 1
return duration, err
}
2 changes: 1 addition & 1 deletion atc/db/task_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ var _ = Describe("TaskQueue", func() {
_, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('foo_id', 'foo_platform', 42, 'foo_tag', '2020-01-01 00:00:00.0+00')")
Expect(err).ToNot(HaveOccurred())
})
It("the elapsed time in the queue in non-null", func() {
It("reports the elapsed time in the queue as positive", func() {
duration, err := taskQueue.Elapsed("foo_id")
Expect(err).ToNot(HaveOccurred())
Expect(duration).To(BeNumerically(">", 0))
Expand Down
2 changes: 1 addition & 1 deletion atc/metric/emitter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) {
Subsystem: "taskqueue",
Name: "queue_duration",
Help: "Queue duration time in seconds",
Buckets: []float64{1, 30, 60, 120, 180, 240, 300, 600, 1200},
Buckets: []float64{1, 15, 30, 60, 120, 180, 240, 300, 600, 1200},
},
[]string{"platform", "team", "tags"},
)
Expand Down

0 comments on commit 5b744f2

Please sign in to comment.