Skip to content

Commit

Permalink
Stop considering running nodes as 'incoming capacity', add mutex to n…
Browse files Browse the repository at this point in the history
…odeState.UpdateStatus(), minor improvements.
  • Loading branch information
gnutix committed Nov 8, 2023
1 parent 6ad4293 commit fc1dd44
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 56 deletions.
7 changes: 3 additions & 4 deletions scheduler/internal/calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package internal
import "math"

func NbNodesToCreate(maxNodes, tasksPerNode, tasksQueue, existingNodes, incomingNodes int) int {
consideredNodes := existingNodes + incomingNodes
incomingCapacity := consideredNodes * tasksPerNode
incomingCapacity := incomingNodes * tasksPerNode
requiredNodes := math.Ceil(float64(tasksQueue-incomingCapacity) / float64(tasksPerNode))
maximumMoreNodes := float64(maxNodes - consideredNodes)
maximumMoreNodes := float64(maxNodes - existingNodes - incomingNodes)

return int(math.Max(0, math.Min(requiredNodes, maximumMoreNodes)))
return int(math.Min(requiredNodes, maximumMoreNodes))
}
50 changes: 25 additions & 25 deletions scheduler/internal/calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,105 +25,105 @@ var flagtests = map[int]struct {
}{
0: {
1, 1, // max nodes, tasks per node
0, 0, // tasks, running nodes
0, 0, // queued tasks, running nodes
0, 0, 0, // provisioning, queue ready, queue future
map[int]int{step1: 0, step2: 0, step3: 0},
},
1: {
1, 1, // max nodes, tasks per node
1, 0, // tasks, running nodes
1, 0, // queued tasks, running nodes
0, 0, 0, // provisioning, queue ready, queue future
map[int]int{step1: 1, step2: 1, step3: 1},
},
2: {
1, 1, // max nodes, tasks per node
5, 0, // tasks, running nodes
5, 0, // queued tasks, running nodes
0, 0, 0, // provisioning, queue ready, queue future
map[int]int{step1: 1, step2: 1, step3: 1},
},
3: {
3, 1, // max nodes, tasks per node
5, 0, // tasks, running nodes
5, 0, // queued tasks, running nodes
0, 0, 0, // provisioning, queue ready, queue future
map[int]int{step1: 3, step2: 3, step3: 3},
},
4: {
5, 1, // max nodes, tasks per node
3, 0, // tasks, running nodes
3, 0, // queued tasks, running nodes
0, 0, 0, // provisioning, queue ready, queue future
map[int]int{step1: 3, step2: 3, step3: 3},
},
5: {
5, 1, // max nodes, tasks per node
3, 1, // tasks, running nodes
3, 1, // queued tasks, running nodes
0, 0, 0, // provisioning, queue ready, queue future
map[int]int{step1: 2, step2: 2, step3: 2},
map[int]int{step1: 3, step2: 3, step3: 3},
},
6: {
4, 4, // max nodes, tasks per node
20, 0, // tasks, running nodes
20, 0, // queued tasks, running nodes
0, 0, 0, // provisioning, queue ready, queue future
map[int]int{step1: 4, step2: 4, step3: 4},
},
7: {
4, 4, // max nodes, tasks per node
16, 1, // tasks, running nodes
16, 1, // queued tasks, running nodes
0, 0, 3, // provisioning, queue ready, queue future
map[int]int{step1: 3, step2: 3, step3: 0},
},
8: {
4, 4, // max nodes, tasks per node
16, 1, // tasks, running nodes
16, 1, // queued tasks, running nodes
0, 1, 2, // provisioning, queue ready, queue future
map[int]int{step1: 3, step2: 2, step3: 0},
},
9: {
4, 4, // max nodes, tasks per node
16, 1, // tasks, running nodes
16, 1, // queued tasks, running nodes
0, 1, 2, // provisioning, queue ready, queue future
map[int]int{step1: 3, step2: 2, step3: 0},
},
10: {
4, 4, // max nodes, tasks per node
12, 1, // tasks, running nodes
12, 1, // queued tasks, running nodes
0, 0, 3, // provisioning, queue ready, queue future
map[int]int{step1: 2, step2: 2, step3: 0},
map[int]int{step1: 3, step2: 3, step3: 0},
},
11: {
4, 4, // max nodes, tasks per node
12, 1, // tasks, running nodes
12, 1, // queued tasks, running nodes
0, 1, 2, // provisioning, queue ready, queue future
map[int]int{step1: 2, step2: 1, step3: 0},
map[int]int{step1: 3, step2: 2, step3: 0},
},
12: {
4, 4, // max nodes, tasks per node
8, 1, // tasks, running nodes
8, 1, // queued tasks, running nodes
0, 1, 2, // provisioning, queue ready, queue future
map[int]int{step1: 1, step2: 0, step3: 0},
map[int]int{step1: 2, step2: 1, step3: -1},
},
13: {
4, 4, // max nodes, tasks per node
8, 1, // tasks, running nodes
8, 1, // queued tasks, running nodes
1, 1, 1, // provisioning, queue ready, queue future
map[int]int{step1: 0, step2: 0, step3: 0},
map[int]int{step1: 1, step2: 0, step3: -1},
},
14: {
4, 4, // max nodes, tasks per node
8, 2, // tasks, running nodes
8, 2, // queued tasks, running nodes
1, 0, 1, // provisioning, queue ready, queue future
map[int]int{step1: 0, step2: 0, step3: 0},
map[int]int{step1: 1, step2: 1, step3: 0},
},
15: {
4, 4, // max nodes, tasks per node
4, 2, // tasks, running nodes
4, 2, // queued tasks, running nodes
1, 0, 1, // provisioning, queue ready, queue future
map[int]int{step1: 0, step2: 0, step3: 0},
map[int]int{step1: 0, step2: 0, step3: -1},
},
16: {
4, 4, // max nodes, tasks per node
0, 2, // tasks, running nodes
0, 2, // queued tasks, running nodes
1, 1, 0, // provisioning, queue ready, queue future
map[int]int{step1: 0, step2: 0, step3: 0},
map[int]int{step1: -1, step2: -2, step3: -2},
},
}

Expand Down
6 changes: 6 additions & 0 deletions scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"log/slog"
"sync"
"time"

"github.com/gammadia/alfred/proto"
Expand Down Expand Up @@ -45,10 +46,15 @@ type nodeState struct {

nodeName string
earliestStart time.Time

mutex sync.Mutex
}

func (ns *nodeState) UpdateStatus(status NodeStatus) {
if ns.status != status {
ns.mutex.Lock()
defer ns.mutex.Unlock()

ns.status = status
ns.scheduler.broadcast(EventNodeStatusUpdated{Node: ns.nodeName, Status: status})
}
Expand Down
57 changes: 30 additions & 27 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (s *Scheduler) resizePool() {

// Make sure we need more nodes
if len(s.tasksQueue) < 1 || len(s.nodes) >= s.config.MaxNodes {
s.emptyNodesQueue()
return
}
s.log.Debug("Need more nodes",
Expand All @@ -247,15 +248,14 @@ func (s *Scheduler) resizePool() {
"nodes", len(s.nodes),
)
nodesToCreate := 0
incomingNodes := 0

// First, we check if provisioning nodes cover our needs
provisioningNodes := lo.Reduce(s.nodes, func(acc int, nodeState *nodeState, _ int) int {
return lo.Ternary(nodeState.status == NodeStatusProvisioning, acc+1, acc)
}, 0)
incomingNodes = provisioningNodes
if nodesToCreate = s.nbNodesToCreate(incomingNodes); nodesToCreate < 1 {
if nodesToCreate = s.nbNodesToCreate(provisioningNodes); nodesToCreate < 1 {
s.log.Debug("Provisioning nodes cover our needs", "needed", nodesToCreate, "provisioningNodes", provisioningNodes)
// TODO: terminate/discard extra provisioning nodes if nodesToCreate < 0 ?
s.emptyNodesQueue()
return
}
Expand All @@ -265,45 +265,48 @@ func (s *Scheduler) resizePool() {
queuedNodes := 0
provisioningQueueNodes := 0
for _, nodeState := range s.nodesQueue {
if s.config.MaxNodes-len(s.nodes) > 0 && time.Now().After(nodeState.earliestStart) {
nodeState.log.Info("Provisioning queued node")
if time.Now().After(nodeState.earliestStart) {
nodeState.log.Info("Provisioning node from the queue")
nodeState.UpdateStatus(NodeStatusProvisioning)

s.nodesQueue = lo.Without(s.nodesQueue, nodeState)
s.nodes = append(s.nodes, nodeState)
go s.watchNodeProvisioning(nodeState)

provisioningQueueNodes += 1

if nodesToCreate = s.nbNodesToCreate(provisioningNodes + provisioningQueueNodes); nodesToCreate < 1 {
s.log.Debug("Provisioning nodes from the queue cover our needs",
"needed", nodesToCreate,
"provisioningNodes", provisioningNodes,
"provisioningQueueNodes", provisioningQueueNodes,
)
s.emptyNodesQueue()
return
} else {
s.log.Debug("Provisioning nodes from the queue are not enough",
"needed", nodesToCreate,
"provisioningNodes", provisioningNodes,
"provisioningQueueNodes", provisioningQueueNodes,
)
}
} else {
queuedNodes += 1
}

incomingNodes = provisioningNodes + provisioningQueueNodes
if nodesToCreate = s.nbNodesToCreate(incomingNodes); nodesToCreate < 1 {
s.log.Debug("Provisioning nodes from the queue cover our needs",
"needed", nodesToCreate,
"provisioningNodes", provisioningNodes,
"provisioningQueueNodes", provisioningQueueNodes,
)
s.emptyNodesQueue()
return
}
}
s.log.Debug("Provisioning nodes from the queue are not enough",
"needed", nodesToCreate,
"provisioningNodes", provisioningNodes,
"provisioningQueueNodes", provisioningQueueNodes,
)

// Finally, we check if nodes queued in the future cover our needs
incomingNodes = provisioningNodes + provisioningQueueNodes + queuedNodes
if nodesToCreate = s.nbNodesToCreate(incomingNodes); nodesToCreate < 1 {
if nodesToCreate = s.nbNodesToCreate(provisioningNodes + provisioningQueueNodes + queuedNodes); nodesToCreate < 1 {
s.log.Debug("Provisioning nodes and queued nodes cover our needs",
"needed", nodesToCreate,
"provisioningNodes", provisioningNodes,
"provisioningQueueNodes", provisioningQueueNodes,
"queuedNodes", queuedNodes,
)
if nodesToCreate < 0 {
s.log.Debug("Remove extra nodes from the queue", "nodesToRemove", -nodesToCreate)
s.nodesQueue = s.nodesQueue[:nodesToCreate]
}
return
}
s.log.Debug("Provisioning nodes and queued nodes are not enough",
Expand All @@ -322,10 +325,10 @@ func (s *Scheduler) resizePool() {
delay = s.config.ProvisioningDelay
}

now = time.Now()
s.earliestNextNodeProvisioning = lo.Must(lo.Coalesce(s.earliestNextNodeProvisioning, now)).Add(delay)
now = time.Now().Truncate(time.Second)
s.earliestNextNodeProvisioning = lo.Must(lo.Coalesce(s.earliestNextNodeProvisioning, now)).Truncate(time.Second).Add(delay)
queueNode := now.Before(s.earliestNextNodeProvisioning)
wait = s.earliestNextNodeProvisioning.Sub(now)
wait = s.earliestNextNodeProvisioning.Sub(now) + (2 * time.Second)

nodeName := namegen.Get()
nodeState := &nodeState{
Expand All @@ -340,7 +343,7 @@ func (s *Scheduler) resizePool() {
earliestStart: s.earliestNextNodeProvisioning,
}

nodeState.log.Debug("Creating node", "wait", wait, "earliestStart", s.earliestNextNodeProvisioning, "queued", queueNode)
nodeState.log.Debug("Creating node", "wait", wait, "earliestStart", s.earliestNextNodeProvisioning, "status", nodeState.status)
s.broadcast(EventNodeCreated{Node: nodeName, Status: nodeState.status})

if queueNode {
Expand Down

0 comments on commit fc1dd44

Please sign in to comment.