Skip to content

Commit

Permalink
Tim's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pkazmierczak committed Dec 16, 2024
1 parent 26d7cd5 commit d3bece0
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 26 deletions.
22 changes: 10 additions & 12 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ type HostVolumeChecker struct {
namespace string
}

// allocVolumeRequest associates allocation ID with the volume request
// allocVolumeRequest associates allocation volume IDs with the volume request
type allocVolumeRequest struct {
allocID string
volumeReq *structs.VolumeRequest
hostVolumeIDs []string
cniVolumeIDs []string
volumeReq *structs.VolumeRequest
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
Expand All @@ -159,7 +160,9 @@ func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
}

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(allocName, allocID string, ns string, volumes map[string]*structs.VolumeRequest) {
func (h *HostVolumeChecker) SetVolumes(
allocName, ns string, volumes map[string]*structs.VolumeRequest, allocHostVolumeIDs []string,
) {
h.namespace = ns
h.volumeReqs = []*allocVolumeRequest{}
for _, req := range volumes {
Expand All @@ -171,10 +174,10 @@ func (h *HostVolumeChecker) SetVolumes(allocName, allocID string, ns string, vol
// provide a unique volume source per allocation
copied := req.Copy()
copied.Source = copied.Source + structs.AllocSuffix(allocName)
h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{allocID, copied})
h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{volumeReq: copied})

} else {
h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{allocID, req})
h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{hostVolumeIDs: allocHostVolumeIDs, volumeReq: req})
}
}
}
Expand All @@ -195,7 +198,6 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) (bool, string) {
return true, ""
}

ws := memdb.NewWatchSet()
for _, req := range h.volumeReqs {
volCfg, ok := n.HostVolumes[req.volumeReq.Source]
if !ok {
Expand Down Expand Up @@ -227,11 +229,7 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) (bool, string) {
}

if req.volumeReq.Sticky {
allocation, err := h.ctx.State().AllocByID(ws, req.allocID)
if err != nil {
return false, FilterConstraintHostVolumesAllocLookupFailed
}
if slices.Contains(allocation.HostVolumeIDs, vol.ID) || len(allocation.HostVolumeIDs) == 0 {
if slices.Contains(req.hostVolumeIDs, vol.ID) || len(req.hostVolumeIDs) == 0 {
return true, ""
}

Expand Down
6 changes: 3 additions & 3 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, alloc.ID, structs.DefaultNamespace, c.RequestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes, alloc.HostVolumeIDs)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, alloc.ID, structs.DefaultNamespace, tc.requestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes, alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestHostVolumeChecker_Sticky(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(tc.alloc.Name, tc.alloc.ID, structs.DefaultNamespace, stickyRequest)
checker.SetVolumes(tc.alloc.Name, structs.DefaultNamespace, stickyRequest, tc.alloc.HostVolumeIDs)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
Expand Down
11 changes: 7 additions & 4 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
if missing.PreviousAllocation() != nil && len(missing.PreviousAllocation().HostVolumeIDs) > 0 {
continue
}
vol, ok := option.Node.HostVolumes[v.Source]
if ok {
newHostVolumeIDs = append(newHostVolumeIDs, vol.ID)
}
newHostVolumeIDs = append(newHostVolumeIDs, option.Node.HostVolumes[v.Source].ID)
}
}

Expand Down Expand Up @@ -699,6 +696,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

if len(newHostVolumeIDs) > 0 {
alloc.HostVolumeIDs = newHostVolumeIDs
} else {
alloc.HostVolumeIDs = prevAllocation.HostVolumeIDs
}

// If the new allocation is replacing an older allocation then we
Expand Down Expand Up @@ -858,6 +857,10 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes

if prevAllocation.HostVolumeIDs != nil {
selectOptions.AllocationHostVolumeIDs = prevAllocation.HostVolumeIDs
}
}
if preferredNode != nil {
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
Expand Down
15 changes: 8 additions & 7 deletions scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ type Stack interface {
}

type SelectOptions struct {
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
AllocID string
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
AllocID string
AllocationHostVolumeIDs []string
}

// GenericStack is the Stack used for the Generic scheduler. It is
Expand Down Expand Up @@ -157,7 +158,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, options.AllocID, s.jobNamespace, tg.Volumes)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
Expand Down Expand Up @@ -350,7 +351,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, options.AllocID, s.jobNamespace, tg.Volumes)
s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes, options.AllocationHostVolumeIDs)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
Expand Down

0 comments on commit d3bece0

Please sign in to comment.