Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stateful deployments: find feasible node for sticky host volumes #24558

Open
wants to merge 28 commits into
base: dynamic-host-volumes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
efc3f6a
host volume struct update
pkazmierczak Nov 27, 2024
f30f44e
Allocation update
pkazmierczak Nov 27, 2024
d321fc4
notes
pkazmierczak Dec 3, 2024
75b2527
VolumeRequest and VolumeMount update
pkazmierczak Dec 3, 2024
8927c5d
super hacky prototype
pkazmierczak Dec 4, 2024
b8b4639
wip findPreferredNode
pkazmierczak Dec 5, 2024
b610f5e
CSI vols can be sticky too
pkazmierczak Dec 5, 2024
ba82ddc
refactor hasVolumes
pkazmierczak Dec 5, 2024
c1a11ff
findPreferredNode
pkazmierczak Dec 5, 2024
3cfb7ce
separate CSI and host volumes
pkazmierczak Dec 9, 2024
0664e7c
accidental git snafu
pkazmierczak Dec 10, 2024
e0be27e
correct findPreferredNode
pkazmierczak Dec 10, 2024
d9dbecf
Tim's comment
pkazmierczak Dec 11, 2024
1857bbf
hasVolumes
pkazmierczak Dec 11, 2024
955ce28
simplify
pkazmierczak Dec 11, 2024
347287c
hasVolumes and tests
pkazmierczak Dec 11, 2024
f5d3eda
Update nomad/structs/structs.go
pkazmierczak Dec 11, 2024
4ae311f
don't return too early
pkazmierczak Dec 11, 2024
7db247e
make alloc ID available to the host volume checker
pkazmierczak Dec 12, 2024
a0ec4c8
fix returns
pkazmierczak Dec 12, 2024
26d7cd5
adjust computePlacements
pkazmierczak Dec 13, 2024
d3bece0
Tim's comments
pkazmierczak Dec 16, 2024
f9caa31
cleanup feasible.go
pkazmierczak Dec 16, 2024
cdff86b
clean up reconciler
pkazmierczak Dec 16, 2024
4dd3ada
test
pkazmierczak Dec 16, 2024
324e17a
don't need CNI here
pkazmierczak Dec 16, 2024
bfd9fbd
extra check
pkazmierczak Dec 16, 2024
24ab149
Tim's comments
pkazmierczak Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ type Allocation struct {
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
HostVolumeIDs []string
CSIVolumeIDs []string
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11114,6 +11114,13 @@ type Allocation struct {
// AllocatedResources is the total resources allocated for the task group.
AllocatedResources *AllocatedResources

// HostVolumeIDs is a list of host volume IDs that this allocation
// has claimed.
HostVolumeIDs []string

// CSIVolumeIDs is a list of CSI volume IDs that this allocation has claimed.
CSIVolumeIDs []string

// Metrics associated with this allocation
Metrics *AllocMetric

Expand Down
39 changes: 28 additions & 11 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"reflect"
"regexp"
"slices"
"strconv"
"strings"

Expand Down Expand Up @@ -138,22 +139,30 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context
volumeReqs []*structs.VolumeRequest
volumeReqs []*allocVolumeRequest
namespace string
}

// allocVolumeRequest associates allocation volume IDs with the volume request
type allocVolumeRequest struct {
hostVolumeIDs []string
volumeReq *structs.VolumeRequest
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
return &HostVolumeChecker{
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
volumeReqs: []*allocVolumeRequest{},
}
}

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) {
func (h *HostVolumeChecker) SetVolumes(
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string,
) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
h.volumeReqs = []*allocVolumeRequest{}
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue // filter CSI volumes
Expand All @@ -163,10 +172,10 @@ func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[
// provide a unique volume source per allocation
copied := req.Copy()
copied.Source = copied.Source + structs.AllocSuffix(allocName)
h.volumeReqs = append(h.volumeReqs, copied)
h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{volumeReq: copied})

} else {
h.volumeReqs = append(h.volumeReqs, req)
h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{hostVolumeIDs: allocHostVolumeIDs, volumeReq: req})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're assigning the same []string to every request, why not have it on the HostVolumeChecker instead of on individual requests like this? It's only copying around the trio of pointers for a slice one or two extra times but it grabs my attention as something I'm maybe missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning was that for every call of SetVolumes we could potentially get a different allocation, and thus should store the alloc data per method call. But I see that SetVolumes only ever gets called by Select, so effectively it's per-task-group.

I'll adjust.

}
}
}
Expand All @@ -181,14 +190,13 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
}

func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {

// Fast path: Requested no volumes. No need to check further.
if len(h.volumeReqs) == 0 {
return true
}

for _, req := range h.volumeReqs {
volCfg, ok := n.HostVolumes[req.Source]
volCfg, ok := n.HostVolumes[req.volumeReq.Source]
if !ok {
return false
}
Expand All @@ -207,16 +215,25 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
}
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
if req.volumeReq.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.volumeReq.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
capOk = true
break
}
}
if !capOk {
return false
}
} else if !req.ReadOnly {

if req.volumeReq.Sticky {
if slices.Contains(req.hostVolumeIDs, vol.ID) || len(req.hostVolumeIDs) == 0 {
return true
}

return false
}

} else if !req.volumeReq.ReadOnly {
// this is a static host volume and can only be mounted ReadOnly,
// validate that no requests for it are ReadWrite.
if volCfg.ReadOnly {
Expand Down
113 changes: 111 additions & 2 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand Down Expand Up @@ -359,7 +359,116 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
}
}

func TestHostVolumeChecker_Sticky(t *testing.T) {
ci.Parallel(t)

store, ctx := testContext(t)

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}

hostVolCapsReadWrite := []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
}

dhv := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[1].ID,
RequestedCapabilities: hostVolCapsReadWrite,
State: structs.HostVolumeStateReady,
}

nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{}
nodes[1].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"foo": {ID: dhv.ID}}

for _, node := range nodes {
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node))
}
must.NoError(t, store.UpsertHostVolume(1000, dhv))

stickyRequest := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
Sticky: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}

checker := NewHostVolumeChecker(ctx)

// alloc0 wants a previously registered volume ID that's available on node1
alloc0 := mock.Alloc()
alloc0.NodeID = nodes[1].ID
alloc0.HostVolumeIDs = []string{dhv.ID}

// alloc1 wants a volume ID that's available on node1 but hasn't used it
// before
alloc1 := mock.Alloc()
alloc1.NodeID = nodes[1].ID

// alloc2 wants a volume ID that's unrelated
alloc2 := mock.Alloc()
alloc2.NodeID = nodes[1].ID
alloc2.HostVolumeIDs = []string{uuid.Generate()}

// insert all the allocs into the state
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc0, alloc1, alloc2}))

cases := []struct {
name string
node *structs.Node
alloc *structs.Allocation
expect bool
}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include a case for a sticky volume request that hasn't been previously claimed.

{
"alloc asking for a sticky volume on an infeasible node",
nodes[0],
alloc0,
false,
},
{
"alloc asking for a sticky volume on a feasible node",
nodes[1],
alloc0,
true,
},
{
"alloc asking for a sticky volume on a feasible node for the first time",
nodes[1],
alloc1,
true,
},
{
"alloc asking for an unrelated volume",
nodes[1],
alloc2,
false,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
Expand Down
48 changes: 48 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package scheduler
import (
"fmt"
"runtime/debug"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -657,6 +658,18 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
"old_alloc_name", oldAllocName, "new_alloc_name", newAllocName)
}

// Are there sticky volumes requested by the task group for the first time? If
// yes, make sure the allocation stores their IDs for future reschedules.
var newHostVolumeIDs []string
for _, v := range tg.Volumes {
if v.Sticky {
if missing.PreviousAllocation() != nil && len(missing.PreviousAllocation().HostVolumeIDs) > 0 {
continue
}
newHostVolumeIDs = append(newHostVolumeIDs, option.Node.HostVolumes[v.Source].ID)
}
}

// Create an allocation for this
alloc := &structs.Allocation{
ID: uuid.Generate(),
Expand All @@ -681,6 +694,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
},
}

if len(newHostVolumeIDs) > 0 {
alloc.HostVolumeIDs = newHostVolumeIDs
}

// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if prevAllocation != nil {
Expand All @@ -689,6 +706,10 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
updateRescheduleTracker(alloc, prevAllocation, now)
}

if len(prevAllocation.HostVolumeIDs) > 0 {
alloc.HostVolumeIDs = prevAllocation.HostVolumeIDs
}

// If the allocation has task handles,
// copy them to the new allocation
propagateTaskState(alloc, prevAllocation, missing.PreviousLost())
Expand Down Expand Up @@ -838,6 +859,10 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes

if prevAllocation.HostVolumeIDs != nil {
selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs
}
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
Expand Down Expand Up @@ -910,6 +935,29 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
return preferredNode, nil
}
}

for _, vol := range place.TaskGroup().Volumes {
if !vol.Sticky {
continue
}

var preferredNode *structs.Node
preferredNode, err := s.state.NodeByID(nil, prev.NodeID)
if err != nil {
return nil, err
}

if preferredNode != nil && preferredNode.Ready() {
// if this node has at least one of the allocation volumes, it's a
// preferred one
for _, vol := range preferredNode.HostVolumes {
if slices.Contains(prev.HostVolumeIDs, vol.ID) {
return preferredNode, nil
}
}
}
}

return nil, nil
}

Expand Down
Loading
Loading