From 95f84473caae913be81ded1b108419ece9427372 Mon Sep 17 00:00:00 2001 From: Alessandro Degano Date: Wed, 9 Oct 2019 10:53:18 +0200 Subject: [PATCH] Queue up the tasks in limit-active-tasks. Provide a DB based queueing system for tasks when using the limit-active-tasks strategy. The ordering is insertion-time based: first arrived first served. Tasks are assigned different queues when specifiying: platform, team and tags. Update package for Squirrel to include bugfix for `FromSelect` not working properly. FYI: I was inspired by this talk: https://www.pgcon.org/2016/schedule/attachments/414_queues-pgcon-2016.pdf for the design of the Postgres-based queue. Signed-off-by: Alessandro Degano --- atc/atccmd/command.go | 6 +- atc/db/dbfakes/fake_task_queue.go | 413 ++++++++++++++++++ .../1570689623_tasks_queue.down.sql | 3 + .../migrations/1570689623_tasks_queue.up.sql | 9 + atc/db/task_queue.go | 142 ++++++ atc/db/task_queue_test.go | 244 +++++++++++ atc/metric/emitter/prometheus.go | 41 ++ atc/metric/metrics.go | 23 + atc/worker/client.go | 115 ++++- atc/worker/client_test.go | 10 +- atc/worker/pool.go | 23 + atc/worker/worker.go | 5 + atc/worker/workerfakes/fake_pool.go | 80 ++++ atc/worker/workerfakes/fake_worker.go | 64 +++ go.mod | 4 +- go.sum | 7 +- 16 files changed, 1156 insertions(+), 33 deletions(-) create mode 100644 atc/db/dbfakes/fake_task_queue.go create mode 100644 atc/db/migration/migrations/1570689623_tasks_queue.down.sql create mode 100644 atc/db/migration/migrations/1570689623_tasks_queue.up.sql create mode 100644 atc/db/task_queue.go create mode 100644 atc/db/task_queue_test.go diff --git a/atc/atccmd/command.go b/atc/atccmd/command.go index 4832f0b2cb3..5ca13bdc914 100644 --- a/atc/atccmd/command.go +++ b/atc/atccmd/command.go @@ -596,7 +596,8 @@ func (cmd *RunCommand) constructAPIMembers( ) pool := worker.NewPool(workerProvider) - workerClient := worker.NewClient(pool, workerProvider) + taskQueue := db.NewTaskQueue(dbConn) + workerClient := worker.NewClient(pool, workerProvider, taskQueue) credsManagers := cmd.CredentialManagers dbPipelineFactory := db.NewPipelineFactory(dbConn, lockFactory) @@ -763,7 +764,8 @@ func (cmd *RunCommand) constructBackendMembers( ) pool := worker.NewPool(workerProvider) - workerClient := worker.NewClient(pool, workerProvider) + taskQueue := db.NewTaskQueue(dbConn) + workerClient := worker.NewClient(pool, workerProvider, taskQueue) defaultLimits, err := cmd.parseDefaultLimits() if err != nil { diff --git a/atc/db/dbfakes/fake_task_queue.go b/atc/db/dbfakes/fake_task_queue.go new file mode 100644 index 00000000000..d01312ba474 --- /dev/null +++ b/atc/db/dbfakes/fake_task_queue.go @@ -0,0 +1,413 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package dbfakes + +import ( + "sync" + + "code.cloudfoundry.org/lager" + "github.com/concourse/concourse/atc/db" +) + +type FakeTaskQueue struct { + DequeueStub func(string, lager.Logger) + dequeueMutex sync.RWMutex + dequeueArgsForCall []struct { + arg1 string + arg2 lager.Logger + } + FindOrAppendStub func(string, string, int, string, lager.Logger) (int, int, error) + findOrAppendMutex sync.RWMutex + findOrAppendArgsForCall []struct { + arg1 string + arg2 string + arg3 int + arg4 string + arg5 lager.Logger + } + findOrAppendReturns struct { + result1 int + result2 int + result3 error + } + findOrAppendReturnsOnCall map[int]struct { + result1 int + result2 int + result3 error + } + FindQueueStub func(string) (string, int, string, error) + findQueueMutex sync.RWMutex + findQueueArgsForCall []struct { + arg1 string + } + findQueueReturns struct { + result1 string + result2 int + result3 string + result4 error + } + findQueueReturnsOnCall map[int]struct { + result1 string + result2 int + result3 string + result4 error + } + LengthStub func(string) (int, error) + lengthMutex sync.RWMutex + lengthArgsForCall []struct { + arg1 string + } + lengthReturns struct { + result1 int + result2 error + } + lengthReturnsOnCall map[int]struct { + result1 int + result2 error + } + PositionStub func(string) (int, error) + positionMutex sync.RWMutex + positionArgsForCall []struct { + arg1 string + } + positionReturns struct { + result1 int + result2 error + } + positionReturnsOnCall map[int]struct { + result1 int + result2 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeTaskQueue) Dequeue(arg1 string, arg2 lager.Logger) { + fake.dequeueMutex.Lock() + fake.dequeueArgsForCall = append(fake.dequeueArgsForCall, struct { + arg1 string + arg2 lager.Logger + }{arg1, arg2}) + fake.recordInvocation("Dequeue", []interface{}{arg1, arg2}) + fake.dequeueMutex.Unlock() + if fake.DequeueStub != nil { + fake.DequeueStub(arg1, arg2) + } +} + +func (fake *FakeTaskQueue) DequeueCallCount() int { + fake.dequeueMutex.RLock() + defer fake.dequeueMutex.RUnlock() + return len(fake.dequeueArgsForCall) +} + +func (fake *FakeTaskQueue) DequeueCalls(stub func(string, lager.Logger)) { + fake.dequeueMutex.Lock() + defer fake.dequeueMutex.Unlock() + fake.DequeueStub = stub +} + +func (fake *FakeTaskQueue) DequeueArgsForCall(i int) (string, lager.Logger) { + fake.dequeueMutex.RLock() + defer fake.dequeueMutex.RUnlock() + argsForCall := fake.dequeueArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeTaskQueue) FindOrAppend(arg1 string, arg2 string, arg3 int, arg4 string, arg5 lager.Logger) (int, int, error) { + fake.findOrAppendMutex.Lock() + ret, specificReturn := fake.findOrAppendReturnsOnCall[len(fake.findOrAppendArgsForCall)] + fake.findOrAppendArgsForCall = append(fake.findOrAppendArgsForCall, struct { + arg1 string + arg2 string + arg3 int + arg4 string + arg5 lager.Logger + }{arg1, arg2, arg3, arg4, arg5}) + fake.recordInvocation("FindOrAppend", []interface{}{arg1, arg2, arg3, arg4, arg5}) + fake.findOrAppendMutex.Unlock() + if fake.FindOrAppendStub != nil { + return fake.FindOrAppendStub(arg1, arg2, arg3, arg4, arg5) + } + if specificReturn { + return ret.result1, ret.result2, ret.result3 + } + fakeReturns := fake.findOrAppendReturns + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3 +} + +func (fake *FakeTaskQueue) FindOrAppendCallCount() int { + fake.findOrAppendMutex.RLock() + defer fake.findOrAppendMutex.RUnlock() + return len(fake.findOrAppendArgsForCall) +} + +func (fake *FakeTaskQueue) FindOrAppendCalls(stub func(string, string, int, string, lager.Logger) (int, int, error)) { + fake.findOrAppendMutex.Lock() + defer fake.findOrAppendMutex.Unlock() + fake.FindOrAppendStub = stub +} + +func (fake *FakeTaskQueue) FindOrAppendArgsForCall(i int) (string, string, int, string, lager.Logger) { + fake.findOrAppendMutex.RLock() + defer fake.findOrAppendMutex.RUnlock() + argsForCall := fake.findOrAppendArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3, argsForCall.arg4, argsForCall.arg5 +} + +func (fake *FakeTaskQueue) FindOrAppendReturns(result1 int, result2 int, result3 error) { + fake.findOrAppendMutex.Lock() + defer fake.findOrAppendMutex.Unlock() + fake.FindOrAppendStub = nil + fake.findOrAppendReturns = struct { + result1 int + result2 int + result3 error + }{result1, result2, result3} +} + +func (fake *FakeTaskQueue) FindOrAppendReturnsOnCall(i int, result1 int, result2 int, result3 error) { + fake.findOrAppendMutex.Lock() + defer fake.findOrAppendMutex.Unlock() + fake.FindOrAppendStub = nil + if fake.findOrAppendReturnsOnCall == nil { + fake.findOrAppendReturnsOnCall = make(map[int]struct { + result1 int + result2 int + result3 error + }) + } + fake.findOrAppendReturnsOnCall[i] = struct { + result1 int + result2 int + result3 error + }{result1, result2, result3} +} + +func (fake *FakeTaskQueue) FindQueue(arg1 string) (string, int, string, error) { + fake.findQueueMutex.Lock() + ret, specificReturn := fake.findQueueReturnsOnCall[len(fake.findQueueArgsForCall)] + fake.findQueueArgsForCall = append(fake.findQueueArgsForCall, struct { + arg1 string + }{arg1}) + fake.recordInvocation("FindQueue", []interface{}{arg1}) + fake.findQueueMutex.Unlock() + if fake.FindQueueStub != nil { + return fake.FindQueueStub(arg1) + } + if specificReturn { + return ret.result1, ret.result2, ret.result3, ret.result4 + } + fakeReturns := fake.findQueueReturns + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3, fakeReturns.result4 +} + +func (fake *FakeTaskQueue) FindQueueCallCount() int { + fake.findQueueMutex.RLock() + defer fake.findQueueMutex.RUnlock() + return len(fake.findQueueArgsForCall) +} + +func (fake *FakeTaskQueue) FindQueueCalls(stub func(string) (string, int, string, error)) { + fake.findQueueMutex.Lock() + defer fake.findQueueMutex.Unlock() + fake.FindQueueStub = stub +} + +func (fake *FakeTaskQueue) FindQueueArgsForCall(i int) string { + fake.findQueueMutex.RLock() + defer fake.findQueueMutex.RUnlock() + argsForCall := fake.findQueueArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeTaskQueue) FindQueueReturns(result1 string, result2 int, result3 string, result4 error) { + fake.findQueueMutex.Lock() + defer fake.findQueueMutex.Unlock() + fake.FindQueueStub = nil + fake.findQueueReturns = struct { + result1 string + result2 int + result3 string + result4 error + }{result1, result2, result3, result4} +} + +func (fake *FakeTaskQueue) FindQueueReturnsOnCall(i int, result1 string, result2 int, result3 string, result4 error) { + fake.findQueueMutex.Lock() + defer fake.findQueueMutex.Unlock() + fake.FindQueueStub = nil + if fake.findQueueReturnsOnCall == nil { + fake.findQueueReturnsOnCall = make(map[int]struct { + result1 string + result2 int + result3 string + result4 error + }) + } + fake.findQueueReturnsOnCall[i] = struct { + result1 string + result2 int + result3 string + result4 error + }{result1, result2, result3, result4} +} + +func (fake *FakeTaskQueue) Length(arg1 string) (int, error) { + fake.lengthMutex.Lock() + ret, specificReturn := fake.lengthReturnsOnCall[len(fake.lengthArgsForCall)] + fake.lengthArgsForCall = append(fake.lengthArgsForCall, struct { + arg1 string + }{arg1}) + fake.recordInvocation("Length", []interface{}{arg1}) + fake.lengthMutex.Unlock() + if fake.LengthStub != nil { + return fake.LengthStub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + fakeReturns := fake.lengthReturns + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeTaskQueue) LengthCallCount() int { + fake.lengthMutex.RLock() + defer fake.lengthMutex.RUnlock() + return len(fake.lengthArgsForCall) +} + +func (fake *FakeTaskQueue) LengthCalls(stub func(string) (int, error)) { + fake.lengthMutex.Lock() + defer fake.lengthMutex.Unlock() + fake.LengthStub = stub +} + +func (fake *FakeTaskQueue) LengthArgsForCall(i int) string { + fake.lengthMutex.RLock() + defer fake.lengthMutex.RUnlock() + argsForCall := fake.lengthArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeTaskQueue) LengthReturns(result1 int, result2 error) { + fake.lengthMutex.Lock() + defer fake.lengthMutex.Unlock() + fake.LengthStub = nil + fake.lengthReturns = struct { + result1 int + result2 error + }{result1, result2} +} + +func (fake *FakeTaskQueue) LengthReturnsOnCall(i int, result1 int, result2 error) { + fake.lengthMutex.Lock() + defer fake.lengthMutex.Unlock() + fake.LengthStub = nil + if fake.lengthReturnsOnCall == nil { + fake.lengthReturnsOnCall = make(map[int]struct { + result1 int + result2 error + }) + } + fake.lengthReturnsOnCall[i] = struct { + result1 int + result2 error + }{result1, result2} +} + +func (fake *FakeTaskQueue) Position(arg1 string) (int, error) { + fake.positionMutex.Lock() + ret, specificReturn := fake.positionReturnsOnCall[len(fake.positionArgsForCall)] + fake.positionArgsForCall = append(fake.positionArgsForCall, struct { + arg1 string + }{arg1}) + fake.recordInvocation("Position", []interface{}{arg1}) + fake.positionMutex.Unlock() + if fake.PositionStub != nil { + return fake.PositionStub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + fakeReturns := fake.positionReturns + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeTaskQueue) PositionCallCount() int { + fake.positionMutex.RLock() + defer fake.positionMutex.RUnlock() + return len(fake.positionArgsForCall) +} + +func (fake *FakeTaskQueue) PositionCalls(stub func(string) (int, error)) { + fake.positionMutex.Lock() + defer fake.positionMutex.Unlock() + fake.PositionStub = stub +} + +func (fake *FakeTaskQueue) PositionArgsForCall(i int) string { + fake.positionMutex.RLock() + defer fake.positionMutex.RUnlock() + argsForCall := fake.positionArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeTaskQueue) PositionReturns(result1 int, result2 error) { + fake.positionMutex.Lock() + defer fake.positionMutex.Unlock() + fake.PositionStub = nil + fake.positionReturns = struct { + result1 int + result2 error + }{result1, result2} +} + +func (fake *FakeTaskQueue) PositionReturnsOnCall(i int, result1 int, result2 error) { + fake.positionMutex.Lock() + defer fake.positionMutex.Unlock() + fake.PositionStub = nil + if fake.positionReturnsOnCall == nil { + fake.positionReturnsOnCall = make(map[int]struct { + result1 int + result2 error + }) + } + fake.positionReturnsOnCall[i] = struct { + result1 int + result2 error + }{result1, result2} +} + +func (fake *FakeTaskQueue) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.dequeueMutex.RLock() + defer fake.dequeueMutex.RUnlock() + fake.findOrAppendMutex.RLock() + defer fake.findOrAppendMutex.RUnlock() + fake.findQueueMutex.RLock() + defer fake.findQueueMutex.RUnlock() + fake.lengthMutex.RLock() + defer fake.lengthMutex.RUnlock() + fake.positionMutex.RLock() + defer fake.positionMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeTaskQueue) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ db.TaskQueue = new(FakeTaskQueue) diff --git a/atc/db/migration/migrations/1570689623_tasks_queue.down.sql b/atc/db/migration/migrations/1570689623_tasks_queue.down.sql new file mode 100644 index 00000000000..041d8ddfe3b --- /dev/null +++ b/atc/db/migration/migrations/1570689623_tasks_queue.down.sql @@ -0,0 +1,3 @@ +BEGIN; + DROP TABLE IF EXISTS tasks_queue; +COMMIT; diff --git a/atc/db/migration/migrations/1570689623_tasks_queue.up.sql b/atc/db/migration/migrations/1570689623_tasks_queue.up.sql new file mode 100644 index 00000000000..0e249c3da44 --- /dev/null +++ b/atc/db/migration/migrations/1570689623_tasks_queue.up.sql @@ -0,0 +1,9 @@ +BEGIN; + CREATE TABLE tasks_queue ( + "id" text NOT NULL PRIMARY KEY, + "platform" text NOT NULL, + "team_id" serial NOT NULL, + "worker_tag" text NOT NULL, + "insert_time" timestamp with time zone DEFAULT now() NOT NULL + ); +COMMIT; diff --git a/atc/db/task_queue.go b/atc/db/task_queue.go new file mode 100644 index 00000000000..24db0bc98c0 --- /dev/null +++ b/atc/db/task_queue.go @@ -0,0 +1,142 @@ +package db + +import ( + "database/sql" + "fmt" + + "code.cloudfoundry.org/lager" + sq "github.com/Masterminds/squirrel" +) + +//go:generate counterfeiter . TaskQueue + +type TaskQueue interface { + FindOrAppend(string, string, int, string, lager.Logger) (int, int, error) + FindQueue(string) (string, int, string, error) + Dequeue(string, lager.Logger) + Length(string) (int, error) + Position(string) (int, error) +} + +type taskQueue struct { + conn Conn +} + +func NewTaskQueue(conn Conn) TaskQueue { + return &taskQueue{ + conn: conn, + } +} + +func (queue *taskQueue) FindOrAppend(id string, platform string, teamId int, workerTag string, logger lager.Logger) (position int, length int, err error) { + // Returns the position and the total queue length for a given id + exPlatform, exTeamId, exWorkerTag, err := queue.FindQueue(id) + if err != nil && err != sql.ErrNoRows { + return 0, 0, err + } + // Check that the id is not already present in a different queue, remove it from that queue in that case + if exPlatform != platform || exTeamId != teamId || exWorkerTag != workerTag { + logger.Info(fmt.Sprintf("%s.already-present-in-different-queue", id)) + queue.Dequeue(id, logger) + } + position, err = queue.Position(id) + if err != nil { + return 0, 0, err + } + if position > 0 { // Already in the queue, return position and total queue length + length, err = queue.Length(id) + if err != nil { + return 0, 0, err + } + } else { // Append to the queue, then check its position and total queue length + _, err := psql.Insert("tasks_queue"). + Values(id, platform, teamId, workerTag, sq.Expr("now()")). + RunWith(queue.conn). + Exec() + if err != nil { + return 0, 0, err + } + + position, err = queue.Position(id) + if err != nil { + return 0, 0, err + } + length, err = queue.Length(id) + if err != nil { + return 0, 0, err + } + } + return position, length, nil +} + +func (queue *taskQueue) Dequeue(id string, logger lager.Logger) { + _, err := psql.Delete("tasks_queue"). + Where(sq.Eq{"id": id}). + RunWith(queue.conn). + Exec() + if err != nil { + logger.Error("failed-to-dequeue-task", err) + } +} + +func (queue *taskQueue) Position(id string) (position int, err error) { + // Return 0 if the id is not present, + // its position if found, where 1 is the front of the queue, + // an error in any other case. + platform, teamId, workerTag, err := queue.FindQueue(id) + if err != nil { + if err == sql.ErrNoRows { + return 0, nil + } else { + return 0, err + } + } + tasks_positions := psql.Select("row_number() over (), id"). + From("tasks_queue"). + Where(sq.Eq{"platform": platform, "team_id": teamId, "worker_tag": workerTag}). + OrderBy("insert_time") + err = psql.Select("row_number"). + FromSelect(tasks_positions, "subq"). + Where(sq.Eq{"id": id}). + RunWith(queue.conn). + QueryRow(). + Scan(&position) + if err != nil { + return 0, err + } + return position, nil +} + +func (queue *taskQueue) FindQueue(id string) (platform string, teamId int, workerTag string, err error) { + err = psql.Select("platform, team_id, worker_tag"). + From("tasks_queue"). + Where(sq.Eq{"id": id}). + RunWith(queue.conn). + QueryRow(). + Scan(&platform, &teamId, &workerTag) + if err != nil { + return "", 0, "", err + } + return platform, teamId, workerTag, nil +} + +func (queue *taskQueue) Length(id string) (length int, err error) { + platform, teamId, workerTag, err := queue.FindQueue(id) + if err != nil { + if err == sql.ErrNoRows { + return 0, nil + } else { + return 0, err + } + } + err = psql.Select("count(*)"). + From("tasks_queue"). + Where(sq.Eq{"platform": platform, "team_id": teamId, "worker_tag": workerTag}). + RunWith(queue.conn). + QueryRow(). + Scan(&length) + if err != nil { + return 0, err + } + return length, nil +} diff --git a/atc/db/task_queue_test.go b/atc/db/task_queue_test.go new file mode 100644 index 00000000000..ff684b8e76e --- /dev/null +++ b/atc/db/task_queue_test.go @@ -0,0 +1,244 @@ +package db_test + +import ( + "database/sql" + "time" + + "code.cloudfoundry.org/lager/lagertest" + "github.com/concourse/concourse/atc/db" + "github.com/lib/pq" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("TaskQueue", func() { + var ( + listener *pq.Listener + dbConn db.Conn + + logger *lagertest.TestLogger + + taskQueue db.TaskQueue + ) + + BeforeEach(func() { + postgresRunner.Truncate() + + listener = pq.NewListener(postgresRunner.DataSourceName(), time.Second, time.Minute, nil) + Eventually(listener.Ping, 5*time.Second).ShouldNot(HaveOccurred()) + + logger = lagertest.NewTestLogger("test") + dbConn = postgresRunner.OpenConn() + + taskQueue = db.NewTaskQueue(dbConn) + }) + + AfterEach(func() { + err := dbConn.Close() + Expect(err).NotTo(HaveOccurred()) + + err = listener.Close() + Expect(err).NotTo(HaveOccurred()) + }) + + Describe("queue length", func() { + Context("when the id is not on any queue", func() { + It("returns that the length of the queue is 0", func() { + length, err := taskQueue.Length("foo") + Expect(length).To(BeZero()) + Expect(err).NotTo(HaveOccurred()) + }) + }) + Context("when the id is the only element in the queue", func() { + BeforeEach(func() { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('foo_id', 'foo_platform', 42, 'foo_tag', NOW())") + Expect(err).ToNot(HaveOccurred()) + }) + It("returns that the length of the queue is 1", func() { + length, err := taskQueue.Length("foo_id") + Expect(length).To(Equal(1)) + Expect(err).ToNot(HaveOccurred()) + }) + }) + Context("when the id is the only element in the queue", func() { + BeforeEach(func() { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('foo_id', 'foo_platform', 42, 'foo_tag', NOW())") + Expect(err).ToNot(HaveOccurred()) + }) + It("returns that the length of the queue where the element is in is 1", func() { + length, err := taskQueue.Length("foo_id") + Expect(length).To(Equal(1)) + Expect(err).ToNot(HaveOccurred()) + }) + }) + Context("when 3 elements are in the same queue", func() { + BeforeEach(func() { + for _, id := range []string{"foo", "bar", "baz"} { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ($1, 'foo_platform', 42, 'foo_tag', NOW())", id) + Expect(err).ToNot(HaveOccurred()) + } + }) + It("returns that the length of the queue where one of the element is in is 3", func() { + length, err := taskQueue.Length("bar") + Expect(length).To(Equal(3)) + Expect(err).ToNot(HaveOccurred()) + }) + It("returns that the length of the queue for a different element is 0", func() { + length, err := taskQueue.Length("blah") + Expect(length).To(Equal(0)) + Expect(err).ToNot(HaveOccurred()) + }) + }) + }) + + Describe("queue position", func() { + Context("when an element is not yet in the queue", func() { + It("returns that its position is 0", func() { + pos, err := taskQueue.Position("foo") + Expect(pos).To(Equal(0)) + Expect(err).ToNot(HaveOccurred()) + }) + }) + Context("when the id is the only element in the queue", func() { + BeforeEach(func() { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('foo_id', 'foo_platform', 42, 'foo_tag', NOW())") + Expect(err).ToNot(HaveOccurred()) + }) + It("returns that its position is 1", func() { + pos, err := taskQueue.Position("foo_id") + Expect(pos).To(Equal(1)) + Expect(err).ToNot(HaveOccurred()) + }) + }) + Context("when 3 elements are in the same queue", func() { + BeforeEach(func() { + for _, id := range []string{"foo", "bar", "baz"} { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ($1, 'foo_platform', 42, 'foo_tag', NOW())", id) + Expect(err).ToNot(HaveOccurred()) + } + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('extraneous', 'ext_platform', 21, 'foo_tag', NOW())") + Expect(err).ToNot(HaveOccurred()) + }) + It("returns that the position of the second element is 2", func() { + pos, err := taskQueue.Position("bar") + Expect(pos).To(Equal(2)) + Expect(err).ToNot(HaveOccurred()) + }) + It("returns that the position of an element in a different queue is 1", func() { + length, err := taskQueue.Length("extraneous") + Expect(length).To(Equal(1)) + Expect(err).ToNot(HaveOccurred()) + }) + }) + }) + + Describe("Find queue", func() { + Context("when an element is already present in a queue", func() { + BeforeEach(func() { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('foo_id', 'foo_platform', 42, 'foo_tag', NOW())") + Expect(err).ToNot(HaveOccurred()) + }) + It("is found in the correct queue", func() { + platform, team_id, worker_tag, err := taskQueue.FindQueue("foo_id") + Expect(err).ToNot(HaveOccurred()) + Expect(platform).To(Equal("foo_platform")) + Expect(team_id).To(Equal(42)) + Expect(worker_tag).To(Equal("foo_tag")) + }) + }) + Context("when an element is in no queue", func() { + It("returns an error", func() { + _, _, _, err := taskQueue.FindQueue("foo_id") + Expect(err).To(HaveOccurred()) + Expect(err).To(Equal(sql.ErrNoRows)) + }) + }) + }) + + Describe("Find or append to queue", func() { + Context("when an element is added to an empty queue for the first time", func() { + BeforeEach(func() { + pos, length, err := taskQueue.FindOrAppend("foo", "foo_platform", 42, "foo_tag", logger) + Expect(err).ToNot(HaveOccurred()) + Expect(pos).To(Equal(1)) + Expect(length).To(Equal(1)) + }) + It("appears in the database", func() { + rows, err := dbConn.Query(`SELECT id FROM tasks_queue`) + Expect(err).NotTo(HaveOccurred()) + for rows.Next() { + var id string + err := rows.Scan(&id) + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("foo")) + } + }) + }) + Context("when an element is appended to a queue with three elements", func() { + BeforeEach(func() { + for _, id := range []string{"foo", "bar", "baz"} { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ($1, 'foo_platform', 42, 'foo_tag', NOW())", id) + Expect(err).ToNot(HaveOccurred()) + } + pos, length, err := taskQueue.FindOrAppend("blah", "foo_platform", 42, "foo_tag", logger) + Expect(err).ToNot(HaveOccurred()) + Expect(pos).To(Equal(4)) + Expect(length).To(Equal(4)) + }) + It("appears in the database", func() { + rows, err := dbConn.Query(`SELECT id FROM tasks_queue where id = 'blah'`) + Expect(err).NotTo(HaveOccurred()) + for rows.Next() { + var id string + err := rows.Scan(&id) + Expect(err).NotTo(HaveOccurred()) + Expect(id).ToNot(BeNil()) + } + }) + Context("when a new element is added to a different queue", func() { + It("does not interfere with the existing queue", func() { + pos, length, err := taskQueue.FindOrAppend("new_foo", "blah_platform", 42, "foo_tag", logger) + Expect(err).ToNot(HaveOccurred()) + Expect(pos).To(Equal(1)) + Expect(length).To(Equal(1)) + + var old_queue_len int + err = dbConn.QueryRow(`SELECT COUNT(*) FROM tasks_queue where platform = 'foo_platform'`).Scan(&old_queue_len) + Expect(err).ToNot(HaveOccurred()) + Expect(old_queue_len).To(Equal(4)) + }) + }) + }) + Context("when an element already exist in a queue", func() { + BeforeEach(func() { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('foo_id', 'foo_platform', 42, 'foo_tag', NOW())") + Expect(err).ToNot(HaveOccurred()) + }) + Context("when the same element is added to a different queue", func() { + It("is removed from the existing queue, logging a warning", func() { + pos, length, err := taskQueue.FindOrAppend("foo_id", "diff_plat", 42, "foo_tag", logger) + Expect(err).ToNot(HaveOccurred()) + Expect(pos).To(Equal(1)) + Expect(length).To(Equal(1)) + Expect(logger.LogMessages()).To(ContainElement("test.foo_id.already-present-in-different-queue")) + }) + }) + }) + }) + + Describe("Dequeue", func() { + Context("when an element is removed from the queue", func() { + Context("if the element is present", func() { + BeforeEach(func() { + _, err := dbConn.Exec("INSERT INTO tasks_queue(id, platform, team_id, worker_tag, insert_time) VALUES ('foo_id', 'foo_platform', 42, 'foo_tag', NOW())") + Expect(err).ToNot(HaveOccurred()) + }) + It("is removed from the queue", func() { + taskQueue.Dequeue("foo_id", logger) + Expect(logger.LogMessages()).To(Equal([]string{})) + }) + }) + }) + }) +}) diff --git a/atc/metric/emitter/prometheus.go b/atc/metric/emitter/prometheus.go index 37fa1cb90a3..a868c3b78ff 100644 --- a/atc/metric/emitter/prometheus.go +++ b/atc/metric/emitter/prometheus.go @@ -49,6 +49,7 @@ type PrometheusEmitter struct { workerUnknownVolumes *prometheus.GaugeVec workerTasks *prometheus.GaugeVec workersRegistered *prometheus.GaugeVec + taskQueue *prometheus.GaugeVec workerContainersLabels map[string]map[string]prometheus.Labels workerVolumesLabels map[string]map[string]prometheus.Labels @@ -329,6 +330,17 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) { ) prometheus.MustRegister(resourceChecksVec) + taskQueue := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "concourse", + Subsystem: "taskqueue", + Name: "tasks_queue", + Help: "Current number of tasks in queue", + }, + []string{"platform", "team", "tags"}, + ) + prometheus.MustRegister(taskQueue) + listener, err := net.Listen("tcp", config.bind()) if err != nil { return nil, err @@ -372,6 +384,7 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) { workerTasks: workerTasks, workerUnknownContainers: workerUnknownContainers, workerUnknownVolumes: workerUnknownVolumes, + taskQueue: taskQueue, } go emitter.periodicMetricGC() @@ -422,6 +435,8 @@ func (emitter *PrometheusEmitter) Emit(logger lager.Logger, event metric.Event) emitter.databaseMetrics(logger, event) case "resource checked": emitter.resourceMetric(logger, event) + case "tasks queue": + emitter.taskQueueMetric(logger, event) default: // unless we have a specific metric, we do nothing } @@ -771,6 +786,32 @@ func (emitter *PrometheusEmitter) resourceMetric(logger lager.Logger, event metr emitter.resourceChecksVec.WithLabelValues(team, pipeline).Inc() } +func (emitter *PrometheusEmitter) taskQueueMetric(logger lager.Logger, event metric.Event) { + value, ok := event.Value.(int) + if !ok { + logger.Error("tasks-queue-length-type-mismatch", fmt.Errorf("expected event.Value to be a int")) + return + } + platform, exists := event.Attributes["platform"] + if !exists { + logger.Error("failed-to-find-platform-in-event", fmt.Errorf("expected platform to exist in event.Attributes")) + return + } + team, exists := event.Attributes["team"] + if !exists { + logger.Error("failed-to-find-team-in-event", fmt.Errorf("expected team to exist in event.Attributes")) + return + } + tags, exists := event.Attributes["tags"] + if !exists { + logger.Error("failed-to-find-tags-in-event", fmt.Errorf("expected tags to exist in event.Attributes")) + return + } + + fmt.Printf("plat: %s, team: %s, tags: %s", platform, team, tags) + emitter.taskQueue.WithLabelValues(platform, team, tags).Set(float64(value)) +} + // updateLastSeen tracks for each worker when it last received a metric event. func (emitter *PrometheusEmitter) updateLastSeen(event metric.Event) { emitter.mu.Lock() diff --git a/atc/metric/metrics.go b/atc/metric/metrics.go index 347f0c10553..ee1c4f0cbbe 100644 --- a/atc/metric/metrics.go +++ b/atc/metric/metrics.go @@ -644,3 +644,26 @@ func (event WorkersState) Emit(logger lager.Logger) { ) } } + +type TaskQueue struct { + Length int + Platform string + Team int + WorkerTags string +} + +func (event TaskQueue) Emit(logger lager.Logger) { + emit( + logger.Session("tasks-queue"), + Event{ + Name: "tasks queue", + Value: event.Length, + State: EventStateOK, + Attributes: map[string]string{ + "platform": event.Platform, + "team": strconv.Itoa(event.Team), + "tags": event.WorkerTags, + }, + }, + ) +} diff --git a/atc/worker/client.go b/atc/worker/client.go index fa7547e0c8c..217b7d97bb1 100644 --- a/atc/worker/client.go +++ b/atc/worker/client.go @@ -5,7 +5,9 @@ import ( "fmt" "io" "path" + "sort" "strconv" + "strings" "time" "code.cloudfoundry.org/garden" @@ -13,8 +15,9 @@ import ( "github.com/concourse/concourse/atc" "github.com/concourse/concourse/atc/db" "github.com/concourse/concourse/atc/db/lock" + "github.com/concourse/concourse/atc/metric" "github.com/concourse/concourse/atc/runtime" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-multierror" ) const taskProcessID = "task" @@ -41,16 +44,18 @@ type Client interface { ) TaskResult } -func NewClient(pool Pool, provider WorkerProvider) *client { +func NewClient(pool Pool, provider WorkerProvider, taskQueue db.TaskQueue) *client { return &client{ - pool: pool, - provider: provider, + pool: pool, + provider: provider, + taskQueue: taskQueue, } } type client struct { - pool Pool - provider WorkerProvider + pool Pool + provider WorkerProvider + taskQueue db.TaskQueue } type TaskResult struct { @@ -136,6 +141,7 @@ func (client *client) RunTaskStep( lockFactory, owner, containerSpec, + metadata, workerSpec, processSpec.StdoutWriter, ) @@ -254,6 +260,7 @@ func (client *client) chooseTaskWorker( lockFactory lock.LockFactory, owner db.ContainerOwner, containerSpec ContainerSpec, + metadata db.ContainerMetadata, workerSpec WorkerSpec, outputWriter io.Writer, ) (Worker, error) { @@ -263,8 +270,21 @@ func (client *client) chooseTaskWorker( elapsed time.Duration err error existingContainer bool + waitTimer time.Duration + dedicatedWorker bool + teamId int + alreadyQueued bool ) + if strategy.ModifiesActiveTasks() { + alreadyQueued = false + waitTimer = time.Duration(5 * time.Second) // Workers polling frequency + dedicatedWorker, err = client.pool.TeamDedicatedWorkers(logger, workerSpec) + if err != nil { + return nil, err + } + } + for { if strategy.ModifiesActiveTasks() { var acquired bool @@ -272,11 +292,24 @@ func (client *client) chooseTaskWorker( if err != nil { return nil, err } - if !acquired { time.Sleep(time.Second) continue } + + // Check if the job has been aborted, return early in that case + select { + case <-ctx.Done(): + logger.Info("aborted-waiting-worker") + err = activeTasksLock.Release() + if err != nil { + return nil, err + } + return nil, ctx.Err() + default: + } + + // Check if the container is already on the worker, in which case skip the queue existingContainer, err = client.pool.ContainerInWorker(logger, owner, containerSpec, workerSpec) if err != nil { release_err := activeTasksLock.Release() @@ -285,6 +318,55 @@ func (client *client) chooseTaskWorker( } return nil, err } + + if !existingContainer { + // Enter the queue and get the position, or just get the position if already in queue + taskId := fmt.Sprintf("%d_%d_%d", metadata.PipelineID, metadata.JobID, metadata.BuildID) + if dedicatedWorker { + teamId = workerSpec.TeamID + } else { + teamId = 0 + } + tags := containerSpec.Tags + sort.Strings(tags) + workerTags := strings.Join(tags, "_") + queue_position, qlen, err := client.taskQueue.FindOrAppend(taskId, workerSpec.Platform, teamId, workerTags, logger) + if !alreadyQueued { + defer client.taskQueue.Dequeue(taskId, logger) + alreadyQueued = true + } + if err != nil { + err = activeTasksLock.Release() + if err != nil { + logger.Error("failed-to-release-lock", err) + return nil, err + } + logger.Error("failed-to-enqueue-task", err) + return nil, err + } + metric.TaskQueue{ + Length: qlen, + Platform: workerSpec.Platform, + Team: teamId, + WorkerTags: workerTags, + }.Emit(logger) + + if queue_position > 1 { + if elapsed%time.Duration(time.Minute) == 0 { // Every minute report that it is still waiting + _, err := outputWriter.Write([]byte(fmt.Sprintf("The task is in queue: %d/%d.\n", queue_position, qlen))) + if err != nil { + logger.Error("failed-to-report-status", err) + } + } + release_err := activeTasksLock.Release() + if release_err != nil { + return nil, err + } + elapsed += waitTimer + time.Sleep(waitTimer) + continue + } + } } chosenWorker, err = client.pool.FindOrChooseWorkerForContainer( @@ -300,19 +382,6 @@ func (client *client) chooseTaskWorker( } if strategy.ModifiesActiveTasks() { - waitWorker := time.Duration(5 * time.Second) // Workers polling frequency - - select { - case <-ctx.Done(): - logger.Info("aborted-waiting-worker") - err = activeTasksLock.Release() - if err != nil { - return nil, err - } - return nil, ctx.Err() - default: - } - if chosenWorker == nil { err = activeTasksLock.Release() if err != nil { @@ -320,14 +389,14 @@ func (client *client) chooseTaskWorker( } if elapsed%time.Duration(time.Minute) == 0 { // Every minute report that it is still waiting - _, err := outputWriter.Write([]byte("All workers are busy at the moment, please stand-by.\n")) + _, err := outputWriter.Write([]byte("The task is at the front of the queue, please stand-by.\n")) if err != nil { logger.Error("failed-to-report-status", err) } } - elapsed += waitWorker - time.Sleep(waitWorker) + elapsed += waitTimer + time.Sleep(waitTimer) continue } diff --git a/atc/worker/client_test.go b/atc/worker/client_test.go index db2f79366a8..3c62f2df10b 100644 --- a/atc/worker/client_test.go +++ b/atc/worker/client_test.go @@ -9,6 +9,7 @@ import ( "code.cloudfoundry.org/garden" "code.cloudfoundry.org/garden/gardenfakes" "github.com/concourse/concourse/atc" + "github.com/concourse/concourse/atc/db/dbfakes" "github.com/concourse/concourse/atc/db/lock/lockfakes" "github.com/concourse/concourse/atc/exec/execfakes" "github.com/concourse/concourse/atc/runtime" @@ -32,14 +33,16 @@ var _ = Describe("Client", func() { client worker.Client fakeLock *lockfakes.FakeLock fakeLockFactory *lockfakes.FakeLockFactory + fakeTaskQueue *dbfakes.FakeTaskQueue ) BeforeEach(func() { logger = lagertest.NewTestLogger("test") fakePool = new(workerfakes.FakePool) fakeProvider = new(workerfakes.FakeWorkerProvider) + fakeTaskQueue = new(dbfakes.FakeTaskQueue) - client = worker.NewClient(fakePool, fakeProvider) + client = worker.NewClient(fakePool, fakeProvider, fakeTaskQueue) }) Describe("FindContainer", func() { @@ -345,6 +348,11 @@ var _ = Describe("Client", func() { BeforeEach(func() { fakeStrategy.ModifiesActiveTasksReturns(true) }) + It("joins and leaves the tasks queue", func() { + Expect(fakeTaskQueue.FindOrAppendCallCount()).To(Equal(1)) + Expect(fakeTaskQueue.DequeueCallCount()).To(Equal(1)) + }) + Context("when a worker is found", func() { BeforeEach(func() { fakeWorker.NameReturns("some-worker") diff --git a/atc/worker/pool.go b/atc/worker/pool.go index 5919353a86b..0949f2d86b8 100644 --- a/atc/worker/pool.go +++ b/atc/worker/pool.go @@ -79,6 +79,11 @@ type Pool interface { WorkerSpec, ContainerPlacementStrategy, ) (Worker, error) + + TeamDedicatedWorkers( + lager.Logger, + WorkerSpec, + ) (bool, error) } type pool struct { @@ -209,3 +214,21 @@ func (pool *pool) FindOrChooseWorker( return workers[rand.Intn(len(workers))], nil } + +func (pool *pool) TeamDedicatedWorkers(logger lager.Logger, workerSpec WorkerSpec) (bool, error) { + workers, err := pool.provider.RunningWorkers(logger) + if err != nil { + return false, err + } + + if len(workers) == 0 { + return false, ErrNoWorkers + } + + for _, worker := range workers { + if worker.OwnedByTeam() == workerSpec.TeamID { + return true, nil + } + } + return false, nil +} diff --git a/atc/worker/worker.go b/atc/worker/worker.go index dbda8ac7e77..85a1ab50c3e 100644 --- a/atc/worker/worker.go +++ b/atc/worker/worker.go @@ -36,6 +36,7 @@ type Worker interface { Tags() atc.Tags Uptime() time.Duration IsOwnedByTeam() bool + OwnedByTeam() int Ephemeral() bool IsVersionCompatible(lager.Logger, version.Version) bool Satisfies(lager.Logger, WorkerSpec) bool @@ -721,6 +722,10 @@ func (worker *gardenWorker) IsOwnedByTeam() bool { return worker.dbWorker.TeamID() != 0 } +func (worker *gardenWorker) OwnedByTeam() int { + return worker.dbWorker.TeamID() +} + func (worker *gardenWorker) Uptime() time.Duration { return time.Since(worker.dbWorker.StartTime()) } diff --git a/atc/worker/workerfakes/fake_pool.go b/atc/worker/workerfakes/fake_pool.go index a64ab6d03dd..3d443f98ce7 100644 --- a/atc/worker/workerfakes/fake_pool.go +++ b/atc/worker/workerfakes/fake_pool.go @@ -59,6 +59,20 @@ type FakePool struct { result1 worker.Worker result2 error } + TeamDedicatedWorkersStub func(lager.Logger, worker.WorkerSpec) (bool, error) + teamDedicatedWorkersMutex sync.RWMutex + teamDedicatedWorkersArgsForCall []struct { + arg1 lager.Logger + arg2 worker.WorkerSpec + } + teamDedicatedWorkersReturns struct { + result1 bool + result2 error + } + teamDedicatedWorkersReturnsOnCall map[int]struct { + result1 bool + result2 error + } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } @@ -261,6 +275,70 @@ func (fake *FakePool) FindOrChooseWorkerForContainerReturnsOnCall(i int, result1 }{result1, result2} } +func (fake *FakePool) TeamDedicatedWorkers(arg1 lager.Logger, arg2 worker.WorkerSpec) (bool, error) { + fake.teamDedicatedWorkersMutex.Lock() + ret, specificReturn := fake.teamDedicatedWorkersReturnsOnCall[len(fake.teamDedicatedWorkersArgsForCall)] + fake.teamDedicatedWorkersArgsForCall = append(fake.teamDedicatedWorkersArgsForCall, struct { + arg1 lager.Logger + arg2 worker.WorkerSpec + }{arg1, arg2}) + fake.recordInvocation("TeamDedicatedWorkers", []interface{}{arg1, arg2}) + fake.teamDedicatedWorkersMutex.Unlock() + if fake.TeamDedicatedWorkersStub != nil { + return fake.TeamDedicatedWorkersStub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + fakeReturns := fake.teamDedicatedWorkersReturns + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakePool) TeamDedicatedWorkersCallCount() int { + fake.teamDedicatedWorkersMutex.RLock() + defer fake.teamDedicatedWorkersMutex.RUnlock() + return len(fake.teamDedicatedWorkersArgsForCall) +} + +func (fake *FakePool) TeamDedicatedWorkersCalls(stub func(lager.Logger, worker.WorkerSpec) (bool, error)) { + fake.teamDedicatedWorkersMutex.Lock() + defer fake.teamDedicatedWorkersMutex.Unlock() + fake.TeamDedicatedWorkersStub = stub +} + +func (fake *FakePool) TeamDedicatedWorkersArgsForCall(i int) (lager.Logger, worker.WorkerSpec) { + fake.teamDedicatedWorkersMutex.RLock() + defer fake.teamDedicatedWorkersMutex.RUnlock() + argsForCall := fake.teamDedicatedWorkersArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakePool) TeamDedicatedWorkersReturns(result1 bool, result2 error) { + fake.teamDedicatedWorkersMutex.Lock() + defer fake.teamDedicatedWorkersMutex.Unlock() + fake.TeamDedicatedWorkersStub = nil + fake.teamDedicatedWorkersReturns = struct { + result1 bool + result2 error + }{result1, result2} +} + +func (fake *FakePool) TeamDedicatedWorkersReturnsOnCall(i int, result1 bool, result2 error) { + fake.teamDedicatedWorkersMutex.Lock() + defer fake.teamDedicatedWorkersMutex.Unlock() + fake.TeamDedicatedWorkersStub = nil + if fake.teamDedicatedWorkersReturnsOnCall == nil { + fake.teamDedicatedWorkersReturnsOnCall = make(map[int]struct { + result1 bool + result2 error + }) + } + fake.teamDedicatedWorkersReturnsOnCall[i] = struct { + result1 bool + result2 error + }{result1, result2} +} + func (fake *FakePool) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() @@ -270,6 +348,8 @@ func (fake *FakePool) Invocations() map[string][][]interface{} { defer fake.findOrChooseWorkerMutex.RUnlock() fake.findOrChooseWorkerForContainerMutex.RLock() defer fake.findOrChooseWorkerForContainerMutex.RUnlock() + fake.teamDedicatedWorkersMutex.RLock() + defer fake.teamDedicatedWorkersMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/atc/worker/workerfakes/fake_worker.go b/atc/worker/workerfakes/fake_worker.go index c062fdb7cb9..729f7215139 100644 --- a/atc/worker/workerfakes/fake_worker.go +++ b/atc/worker/workerfakes/fake_worker.go @@ -237,6 +237,16 @@ type FakeWorker struct { nameReturnsOnCall map[int]struct { result1 string } + OwnedByTeamStub func() int + ownedByTeamMutex sync.RWMutex + ownedByTeamArgsForCall []struct { + } + ownedByTeamReturns struct { + result1 int + } + ownedByTeamReturnsOnCall map[int]struct { + result1 int + } ResourceTypesStub func() []atc.WorkerResourceType resourceTypesMutex sync.RWMutex resourceTypesArgsForCall []struct { @@ -1288,6 +1298,58 @@ func (fake *FakeWorker) NameReturnsOnCall(i int, result1 string) { }{result1} } +func (fake *FakeWorker) OwnedByTeam() int { + fake.ownedByTeamMutex.Lock() + ret, specificReturn := fake.ownedByTeamReturnsOnCall[len(fake.ownedByTeamArgsForCall)] + fake.ownedByTeamArgsForCall = append(fake.ownedByTeamArgsForCall, struct { + }{}) + fake.recordInvocation("OwnedByTeam", []interface{}{}) + fake.ownedByTeamMutex.Unlock() + if fake.OwnedByTeamStub != nil { + return fake.OwnedByTeamStub() + } + if specificReturn { + return ret.result1 + } + fakeReturns := fake.ownedByTeamReturns + return fakeReturns.result1 +} + +func (fake *FakeWorker) OwnedByTeamCallCount() int { + fake.ownedByTeamMutex.RLock() + defer fake.ownedByTeamMutex.RUnlock() + return len(fake.ownedByTeamArgsForCall) +} + +func (fake *FakeWorker) OwnedByTeamCalls(stub func() int) { + fake.ownedByTeamMutex.Lock() + defer fake.ownedByTeamMutex.Unlock() + fake.OwnedByTeamStub = stub +} + +func (fake *FakeWorker) OwnedByTeamReturns(result1 int) { + fake.ownedByTeamMutex.Lock() + defer fake.ownedByTeamMutex.Unlock() + fake.OwnedByTeamStub = nil + fake.ownedByTeamReturns = struct { + result1 int + }{result1} +} + +func (fake *FakeWorker) OwnedByTeamReturnsOnCall(i int, result1 int) { + fake.ownedByTeamMutex.Lock() + defer fake.ownedByTeamMutex.Unlock() + fake.OwnedByTeamStub = nil + if fake.ownedByTeamReturnsOnCall == nil { + fake.ownedByTeamReturnsOnCall = make(map[int]struct { + result1 int + }) + } + fake.ownedByTeamReturnsOnCall[i] = struct { + result1 int + }{result1} +} + func (fake *FakeWorker) ResourceTypes() []atc.WorkerResourceType { fake.resourceTypesMutex.Lock() ret, specificReturn := fake.resourceTypesReturnsOnCall[len(fake.resourceTypesArgsForCall)] @@ -1542,6 +1604,8 @@ func (fake *FakeWorker) Invocations() map[string][][]interface{} { defer fake.lookupVolumeMutex.RUnlock() fake.nameMutex.RLock() defer fake.nameMutex.RUnlock() + fake.ownedByTeamMutex.RLock() + defer fake.ownedByTeamMutex.RUnlock() fake.resourceTypesMutex.RLock() defer fake.resourceTypesMutex.RUnlock() fake.satisfiesMutex.RLock() diff --git a/go.mod b/go.mod index 19f3bebf2dd..52ed22860b6 100644 --- a/go.mod +++ b/go.mod @@ -9,13 +9,13 @@ require ( code.cloudfoundry.org/localip v0.0.0-20170223024724-b88ad0dea95c code.cloudfoundry.org/urljoiner v0.0.0-20170223060717-5cabba6c0a50 contrib.go.opencensus.io/exporter/ocagent v0.5.1 // indirect - github.com/Azure/azure-sdk-for-go v24.0.0+incompatible // indirect + github.com/Azure/azure-sdk-for-go v24.0.0+incompatible github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect github.com/Azure/go-autorest v11.2.8+incompatible // indirect github.com/DataDog/datadog-go v0.0.0-20180702141236-ef3a9daf849d github.com/DataDog/zstd v1.4.0 github.com/Jeffail/gabs v1.1.0 // indirect - github.com/Masterminds/squirrel v0.0.0-20190107164353-fa735ea14f09 + github.com/Masterminds/squirrel v1.1.1-0.20191003161918-b924fed91bf3 github.com/Microsoft/go-winio v0.4.11 // indirect github.com/NYTimes/gziphandler v1.1.1 github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect diff --git a/go.sum b/go.sum index da8129b0171..aaeea3c2242 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,8 @@ github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo= github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Jeffail/gabs v1.1.0 h1:kw5zCcl9tlJNHTDme7qbi21fDHZmXrnjMoXos3Jw/NI= github.com/Jeffail/gabs v1.1.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc= -github.com/Masterminds/squirrel v0.0.0-20190107164353-fa735ea14f09 h1:enWVS77aJkLWVIUExiqF6A8eWTVzCXUKUvkST3/wyKI= -github.com/Masterminds/squirrel v0.0.0-20190107164353-fa735ea14f09/go.mod h1:yaPeOnPG5ZRwL9oKdTsO/prlkPbXWZlRVMQ/gGlzIuA= +github.com/Masterminds/squirrel v1.1.1-0.20191003161918-b924fed91bf3 h1:3nNmKf3U8BcywSxIFO8oDR68IryaK8RbbuLDzWpHmyo= +github.com/Masterminds/squirrel v1.1.1-0.20191003161918-b924fed91bf3/go.mod h1:yaPeOnPG5ZRwL9oKdTsO/prlkPbXWZlRVMQ/gGlzIuA= github.com/Microsoft/go-winio v0.4.11 h1:zoIOcVf0xPN1tnMVbTtEdI+P8OofVk3NObnwOQ6nK2Q= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= @@ -107,8 +107,6 @@ github.com/cloudfoundry/go-socks5 v0.0.0-20180221174514-54f73bdb8a8e/go.mod h1:P github.com/cloudfoundry/socks5-proxy v0.0.0-20180530211953-3659db090cb2 h1:9j2KbUEQn5E7MEV3enSrkJTrBC0iDbosW5gXX+Z+dLE= github.com/cloudfoundry/socks5-proxy v0.0.0-20180530211953-3659db090cb2/go.mod h1:0a+Ghg38uB86Dx+de84dFSkILTnBHzCpFMRnjHgSzi4= github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292/go.mod h1:qRiX68mZX1lGBkTWyp3CLcenw9I94W2dLeRvMzcn9N4= -github.com/concourse/baggageclaim v1.6.3 h1:k5FINjHbcZ0NGnAHqm3J8Pw6pgAXjREyTll1BSvaRgA= -github.com/concourse/baggageclaim v1.6.3/go.mod h1:gKOIecb6eIQ1k2Wh0iyKvP1vqI8XN08qwtijwr3Iroc= github.com/concourse/baggageclaim v1.6.5 h1:nqUNImvNPagFWLpjmYM3K/iUtXcgjWCXGUVEcQtIHUc= github.com/concourse/baggageclaim v1.6.5/go.mod h1:XEX/nfW5iJQwJVLUh1AxyqtFRLeyM+iZLeKMUEloiKc= github.com/concourse/dex v0.0.0-20190417202333-2202f4ef4172 h1:lYBXqY+XJmyMD3uthg734qIxdHU+lAF8mnVk72gdFCA= @@ -463,7 +461,6 @@ github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVo github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/runc v0.1.1 h1:GlxAyO6x8rfZYN9Tt0Kti5a/cP41iuiO2yYT0IJGY8Y= github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= -github.com/opencontainers/runtime-spec v1.0.1/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/ory-am/common v0.4.0 h1:edGPoxYX4hno0IJHXh9TCMUPR6ZcJp+y6aClFYxeuUE=