From fc1dd44a10b765d325a53744c0f2c406a497ee5a Mon Sep 17 00:00:00 2001 From: Dorian Villet Date: Wed, 8 Nov 2023 20:48:24 +0100 Subject: [PATCH] Stop considering running nodes as 'incoming capacity', add mutex to nodeState.UpdateStatus(), minor improvements. --- scheduler/internal/calculator.go | 7 ++-- scheduler/internal/calculator_test.go | 50 +++++++++++------------ scheduler/node.go | 6 +++ scheduler/scheduler.go | 57 ++++++++++++++------------- 4 files changed, 64 insertions(+), 56 deletions(-) diff --git a/scheduler/internal/calculator.go b/scheduler/internal/calculator.go index b114742..5ee0434 100644 --- a/scheduler/internal/calculator.go +++ b/scheduler/internal/calculator.go @@ -3,10 +3,9 @@ package internal import "math" func NbNodesToCreate(maxNodes, tasksPerNode, tasksQueue, existingNodes, incomingNodes int) int { - consideredNodes := existingNodes + incomingNodes - incomingCapacity := consideredNodes * tasksPerNode + incomingCapacity := incomingNodes * tasksPerNode requiredNodes := math.Ceil(float64(tasksQueue-incomingCapacity) / float64(tasksPerNode)) - maximumMoreNodes := float64(maxNodes - consideredNodes) + maximumMoreNodes := float64(maxNodes - existingNodes - incomingNodes) - return int(math.Max(0, math.Min(requiredNodes, maximumMoreNodes))) + return int(math.Min(requiredNodes, maximumMoreNodes)) } diff --git a/scheduler/internal/calculator_test.go b/scheduler/internal/calculator_test.go index 2d2c823..94efcdd 100644 --- a/scheduler/internal/calculator_test.go +++ b/scheduler/internal/calculator_test.go @@ -25,105 +25,105 @@ var flagtests = map[int]struct { }{ 0: { 1, 1, // max nodes, tasks per node - 0, 0, // tasks, running nodes + 0, 0, // queued tasks, running nodes 0, 0, 0, // provisioning, queue ready, queue future map[int]int{step1: 0, step2: 0, step3: 0}, }, 1: { 1, 1, // max nodes, tasks per node - 1, 0, // tasks, running nodes + 1, 0, // queued tasks, running nodes 0, 0, 0, // provisioning, queue ready, queue future map[int]int{step1: 1, step2: 1, step3: 1}, }, 2: { 1, 1, // max nodes, tasks per node - 5, 0, // tasks, running nodes + 5, 0, // queued tasks, running nodes 0, 0, 0, // provisioning, queue ready, queue future map[int]int{step1: 1, step2: 1, step3: 1}, }, 3: { 3, 1, // max nodes, tasks per node - 5, 0, // tasks, running nodes + 5, 0, // queued tasks, running nodes 0, 0, 0, // provisioning, queue ready, queue future map[int]int{step1: 3, step2: 3, step3: 3}, }, 4: { 5, 1, // max nodes, tasks per node - 3, 0, // tasks, running nodes + 3, 0, // queued tasks, running nodes 0, 0, 0, // provisioning, queue ready, queue future map[int]int{step1: 3, step2: 3, step3: 3}, }, 5: { 5, 1, // max nodes, tasks per node - 3, 1, // tasks, running nodes + 3, 1, // queued tasks, running nodes 0, 0, 0, // provisioning, queue ready, queue future - map[int]int{step1: 2, step2: 2, step3: 2}, + map[int]int{step1: 3, step2: 3, step3: 3}, }, 6: { 4, 4, // max nodes, tasks per node - 20, 0, // tasks, running nodes + 20, 0, // queued tasks, running nodes 0, 0, 0, // provisioning, queue ready, queue future map[int]int{step1: 4, step2: 4, step3: 4}, }, 7: { 4, 4, // max nodes, tasks per node - 16, 1, // tasks, running nodes + 16, 1, // queued tasks, running nodes 0, 0, 3, // provisioning, queue ready, queue future map[int]int{step1: 3, step2: 3, step3: 0}, }, 8: { 4, 4, // max nodes, tasks per node - 16, 1, // tasks, running nodes + 16, 1, // queued tasks, running nodes 0, 1, 2, // provisioning, queue ready, queue future map[int]int{step1: 3, step2: 2, step3: 0}, }, 9: { 4, 4, // max nodes, tasks per node - 16, 1, // tasks, running nodes + 16, 1, // queued tasks, running nodes 0, 1, 2, // provisioning, queue ready, queue future map[int]int{step1: 3, step2: 2, step3: 0}, }, 10: { 4, 4, // max nodes, tasks per node - 12, 1, // tasks, running nodes + 12, 1, // queued tasks, running nodes 0, 0, 3, // provisioning, queue ready, queue future - map[int]int{step1: 2, step2: 2, step3: 0}, + map[int]int{step1: 3, step2: 3, step3: 0}, }, 11: { 4, 4, // max nodes, tasks per node - 12, 1, // tasks, running nodes + 12, 1, // queued tasks, running nodes 0, 1, 2, // provisioning, queue ready, queue future - map[int]int{step1: 2, step2: 1, step3: 0}, + map[int]int{step1: 3, step2: 2, step3: 0}, }, 12: { 4, 4, // max nodes, tasks per node - 8, 1, // tasks, running nodes + 8, 1, // queued tasks, running nodes 0, 1, 2, // provisioning, queue ready, queue future - map[int]int{step1: 1, step2: 0, step3: 0}, + map[int]int{step1: 2, step2: 1, step3: -1}, }, 13: { 4, 4, // max nodes, tasks per node - 8, 1, // tasks, running nodes + 8, 1, // queued tasks, running nodes 1, 1, 1, // provisioning, queue ready, queue future - map[int]int{step1: 0, step2: 0, step3: 0}, + map[int]int{step1: 1, step2: 0, step3: -1}, }, 14: { 4, 4, // max nodes, tasks per node - 8, 2, // tasks, running nodes + 8, 2, // queued tasks, running nodes 1, 0, 1, // provisioning, queue ready, queue future - map[int]int{step1: 0, step2: 0, step3: 0}, + map[int]int{step1: 1, step2: 1, step3: 0}, }, 15: { 4, 4, // max nodes, tasks per node - 4, 2, // tasks, running nodes + 4, 2, // queued tasks, running nodes 1, 0, 1, // provisioning, queue ready, queue future - map[int]int{step1: 0, step2: 0, step3: 0}, + map[int]int{step1: 0, step2: 0, step3: -1}, }, 16: { 4, 4, // max nodes, tasks per node - 0, 2, // tasks, running nodes + 0, 2, // queued tasks, running nodes 1, 1, 0, // provisioning, queue ready, queue future - map[int]int{step1: 0, step2: 0, step3: 0}, + map[int]int{step1: -1, step2: -2, step3: -2}, }, } diff --git a/scheduler/node.go b/scheduler/node.go index 5fc3906..d1d1eb3 100644 --- a/scheduler/node.go +++ b/scheduler/node.go @@ -2,6 +2,7 @@ package scheduler import ( "log/slog" + "sync" "time" "github.com/gammadia/alfred/proto" @@ -45,10 +46,15 @@ type nodeState struct { nodeName string earliestStart time.Time + + mutex sync.Mutex } func (ns *nodeState) UpdateStatus(status NodeStatus) { if ns.status != status { + ns.mutex.Lock() + defer ns.mutex.Unlock() + ns.status = status ns.scheduler.broadcast(EventNodeStatusUpdated{Node: ns.nodeName, Status: status}) } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 22d3df7..5fe7db5 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -238,6 +238,7 @@ func (s *Scheduler) resizePool() { // Make sure we need more nodes if len(s.tasksQueue) < 1 || len(s.nodes) >= s.config.MaxNodes { + s.emptyNodesQueue() return } s.log.Debug("Need more nodes", @@ -247,15 +248,14 @@ func (s *Scheduler) resizePool() { "nodes", len(s.nodes), ) nodesToCreate := 0 - incomingNodes := 0 // First, we check if provisioning nodes cover our needs provisioningNodes := lo.Reduce(s.nodes, func(acc int, nodeState *nodeState, _ int) int { return lo.Ternary(nodeState.status == NodeStatusProvisioning, acc+1, acc) }, 0) - incomingNodes = provisioningNodes - if nodesToCreate = s.nbNodesToCreate(incomingNodes); nodesToCreate < 1 { + if nodesToCreate = s.nbNodesToCreate(provisioningNodes); nodesToCreate < 1 { s.log.Debug("Provisioning nodes cover our needs", "needed", nodesToCreate, "provisioningNodes", provisioningNodes) + // TODO: terminate/discard extra provisioning nodes if nodesToCreate < 0 ? s.emptyNodesQueue() return } @@ -265,8 +265,8 @@ func (s *Scheduler) resizePool() { queuedNodes := 0 provisioningQueueNodes := 0 for _, nodeState := range s.nodesQueue { - if s.config.MaxNodes-len(s.nodes) > 0 && time.Now().After(nodeState.earliestStart) { - nodeState.log.Info("Provisioning queued node") + if time.Now().After(nodeState.earliestStart) { + nodeState.log.Info("Provisioning node from the queue") nodeState.UpdateStatus(NodeStatusProvisioning) s.nodesQueue = lo.Without(s.nodesQueue, nodeState) @@ -274,36 +274,39 @@ func (s *Scheduler) resizePool() { go s.watchNodeProvisioning(nodeState) provisioningQueueNodes += 1 + + if nodesToCreate = s.nbNodesToCreate(provisioningNodes + provisioningQueueNodes); nodesToCreate < 1 { + s.log.Debug("Provisioning nodes from the queue cover our needs", + "needed", nodesToCreate, + "provisioningNodes", provisioningNodes, + "provisioningQueueNodes", provisioningQueueNodes, + ) + s.emptyNodesQueue() + return + } else { + s.log.Debug("Provisioning nodes from the queue are not enough", + "needed", nodesToCreate, + "provisioningNodes", provisioningNodes, + "provisioningQueueNodes", provisioningQueueNodes, + ) + } } else { queuedNodes += 1 } - - incomingNodes = provisioningNodes + provisioningQueueNodes - if nodesToCreate = s.nbNodesToCreate(incomingNodes); nodesToCreate < 1 { - s.log.Debug("Provisioning nodes from the queue cover our needs", - "needed", nodesToCreate, - "provisioningNodes", provisioningNodes, - "provisioningQueueNodes", provisioningQueueNodes, - ) - s.emptyNodesQueue() - return - } } - s.log.Debug("Provisioning nodes from the queue are not enough", - "needed", nodesToCreate, - "provisioningNodes", provisioningNodes, - "provisioningQueueNodes", provisioningQueueNodes, - ) // Finally, we check if nodes queued in the future cover our needs - incomingNodes = provisioningNodes + provisioningQueueNodes + queuedNodes - if nodesToCreate = s.nbNodesToCreate(incomingNodes); nodesToCreate < 1 { + if nodesToCreate = s.nbNodesToCreate(provisioningNodes + provisioningQueueNodes + queuedNodes); nodesToCreate < 1 { s.log.Debug("Provisioning nodes and queued nodes cover our needs", "needed", nodesToCreate, "provisioningNodes", provisioningNodes, "provisioningQueueNodes", provisioningQueueNodes, "queuedNodes", queuedNodes, ) + if nodesToCreate < 0 { + s.log.Debug("Remove extra nodes from the queue", "nodesToRemove", -nodesToCreate) + s.nodesQueue = s.nodesQueue[:nodesToCreate] + } return } s.log.Debug("Provisioning nodes and queued nodes are not enough", @@ -322,10 +325,10 @@ func (s *Scheduler) resizePool() { delay = s.config.ProvisioningDelay } - now = time.Now() - s.earliestNextNodeProvisioning = lo.Must(lo.Coalesce(s.earliestNextNodeProvisioning, now)).Add(delay) + now = time.Now().Truncate(time.Second) + s.earliestNextNodeProvisioning = lo.Must(lo.Coalesce(s.earliestNextNodeProvisioning, now)).Truncate(time.Second).Add(delay) queueNode := now.Before(s.earliestNextNodeProvisioning) - wait = s.earliestNextNodeProvisioning.Sub(now) + wait = s.earliestNextNodeProvisioning.Sub(now) + (2 * time.Second) nodeName := namegen.Get() nodeState := &nodeState{ @@ -340,7 +343,7 @@ func (s *Scheduler) resizePool() { earliestStart: s.earliestNextNodeProvisioning, } - nodeState.log.Debug("Creating node", "wait", wait, "earliestStart", s.earliestNextNodeProvisioning, "queued", queueNode) + nodeState.log.Debug("Creating node", "wait", wait, "earliestStart", s.earliestNextNodeProvisioning, "status", nodeState.status) s.broadcast(EventNodeCreated{Node: nodeName, Status: nodeState.status}) if queueNode {