Skip to content

Commit

Permalink
Refactor flavorassigner to reduce number of passed arguments (#1718)
Browse files Browse the repository at this point in the history
* Refactor flavorassigner to reduce number of passed arguments

Change-Id: Ia35a9fb11c4c7b9d24ccf5ca3f5fcfc93b113a04

* review

Change-Id: I509d0bc9f79ad6554293ca7cd767b70380c2972c
  • Loading branch information
alculquicondor authored Feb 15, 2024
1 parent 81a99eb commit e10df68
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 70 deletions.
138 changes: 72 additions & 66 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,60 +227,74 @@ type FlavorAssignment struct {
borrow bool
}

type FlavorAssigner struct {
wl *workload.Info
cq *cache.ClusterQueue
resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor
}

func New(wl *workload.Info, cq *cache.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) *FlavorAssigner {
return &FlavorAssigner{
wl: wl,
cq: cq,
resourceFlavors: resourceFlavors,
}
}

func lastAssignmentOutdated(wl *workload.Info, cq *cache.ClusterQueue) bool {
return cq.AllocatableResourceGeneration > wl.LastAssignment.ClusterQueueGeneration ||
(cq.Cohort != nil && cq.Cohort.AllocatableResourceGeneration > wl.LastAssignment.CohortGeneration)
}

// AssignFlavors assigns a flavor to each of the resources requested in each pod set.
// Assign assigns a flavor to each of the resources requested in each pod set.
// The result for each pod set is accompanied with reasons why the flavor can't
// be assigned immediately. Each assigned flavor is accompanied with a
// FlavorAssignmentMode.
func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue, counts []int32) Assignment {
if wl.LastAssignment != nil && lastAssignmentOutdated(wl, cq) {
func (a *FlavorAssigner) Assign(log logr.Logger, counts []int32) Assignment {
if a.wl.LastAssignment != nil && lastAssignmentOutdated(a.wl, a.cq) {
if logV := log.V(6); logV.Enabled() {
keysValues := []any{
"cq.AllocatableResourceGeneration", cq.AllocatableResourceGeneration,
"wl.LastAssignment.ClusterQueueGeneration", wl.LastAssignment.ClusterQueueGeneration,
"cq.AllocatableResourceGeneration", a.cq.AllocatableResourceGeneration,
"wl.LastAssignment.ClusterQueueGeneration", a.wl.LastAssignment.ClusterQueueGeneration,
}
if cq.Cohort != nil {
if a.cq.Cohort != nil {
keysValues = append(keysValues,
"cq.Cohort.AllocatableResourceGeneration", cq.Cohort.AllocatableResourceGeneration,
"wl.LastAssignment.CohortGeneration", wl.LastAssignment.CohortGeneration,
"cq.Cohort.AllocatableResourceGeneration", a.cq.Cohort.AllocatableResourceGeneration,
"wl.LastAssignment.CohortGeneration", a.wl.LastAssignment.CohortGeneration,
)
}
logV.Info("Clearing Workload's last assignment because it was outdated", keysValues...)
}
wl.LastAssignment = nil
a.wl.LastAssignment = nil
}

if len(counts) == 0 {
return assignFlavors(log, wl.TotalRequests, wl.Obj.Spec.PodSets, resourceFlavors, cq, wl.LastAssignment)
return a.assignFlavors(log, a.wl.TotalRequests)
}

currentResources := make([]workload.PodSetResources, len(wl.TotalRequests))
for i := range wl.TotalRequests {
currentResources[i] = *wl.TotalRequests[i].ScaledTo(counts[i])
currentResources := make([]workload.PodSetResources, len(a.wl.TotalRequests))
for i := range a.wl.TotalRequests {
currentResources[i] = *a.wl.TotalRequests[i].ScaledTo(counts[i])
}
return assignFlavors(log, currentResources, wl.Obj.Spec.PodSets, resourceFlavors, cq, wl.LastAssignment)
return a.assignFlavors(log, currentResources)
}

func assignFlavors(log logr.Logger, requests []workload.PodSetResources, podSets []kueue.PodSet, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue, lastAssignment *workload.AssigmentClusterQueueState) Assignment {
func (a *FlavorAssigner) assignFlavors(log logr.Logger, requests []workload.PodSetResources) Assignment {
assignment := Assignment{
PodSets: make([]PodSetAssignment, 0, len(requests)),
Usage: make(cache.FlavorResourceQuantities),
LastState: workload.AssigmentClusterQueueState{
LastTriedFlavorIdx: make([]map[corev1.ResourceName]int, 0, len(podSets)),
LastTriedFlavorIdx: make([]map[corev1.ResourceName]int, 0, len(requests)),
CohortGeneration: 0,
ClusterQueueGeneration: cq.AllocatableResourceGeneration,
ClusterQueueGeneration: a.cq.AllocatableResourceGeneration,
},
}
if cq.Cohort != nil {
assignment.LastState.CohortGeneration = cq.Cohort.AllocatableResourceGeneration
if a.cq.Cohort != nil {
assignment.LastState.CohortGeneration = a.cq.Cohort.AllocatableResourceGeneration
}

for i, podSet := range requests {
if _, found := cq.RGByResource[corev1.ResourcePods]; found {
if _, found := a.cq.RGByResource[corev1.ResourcePods]; found {
podSet.Requests[corev1.ResourcePods] = int64(podSet.Count)
}

Expand All @@ -297,22 +311,7 @@ func assignFlavors(log logr.Logger, requests []workload.PodSetResources, podSets
// No need to compute again.
continue
}
rg, found := cq.RGByResource[resName]
if !found {
psAssignment.Flavors = nil
psAssignment.Status = &Status{
reasons: []string{fmt.Sprintf("resource %s unavailable in ClusterQueue", resName)},
}
break
}
lastFlavorAssignment := -1
if lastAssignment != nil && len(lastAssignment.LastTriedFlavorIdx) > i {
idx, ok := lastAssignment.LastTriedFlavorIdx[i][resName]
if ok {
lastFlavorAssignment = idx
}
}
flavors, status := assignment.findFlavorForResourceGroup(log, rg, podSet.Requests, resourceFlavors, cq, &podSets[i].Template.Spec, lastFlavorAssignment)
flavors, status := a.findFlavorForPodSetResource(log, i, podSet.Requests, resName, assignment.Usage)
if status.IsError() || len(flavors) == 0 {
psAssignment.Flavors = nil
psAssignment.Status = status
Expand Down Expand Up @@ -356,38 +355,45 @@ func (a *Assignment) append(requests workload.Requests, psAssignment *PodSetAssi
a.LastState.LastTriedFlavorIdx = append(a.LastState.LastTriedFlavorIdx, flavorIdx)
}

// findFlavorForResourceGroup finds the flavor which can satisfy the resource
// request, along with the information about resources that need to be borrowed.
// findFlavorForPodSetResource finds the flavor which can satisfy the podSet request
// for all resources in the same group as resName.
// Returns the chosen flavor, along with the information about resources that need to be borrowed.
// If the flavor cannot be immediately assigned, it returns a status with
// reasons or failure.
func (a *Assignment) findFlavorForResourceGroup(
func (a *FlavorAssigner) findFlavorForPodSetResource(
log logr.Logger,
rg *cache.ResourceGroup,
psId int,
requests workload.Requests,
resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor,
cq *cache.ClusterQueue,
spec *corev1.PodSpec,
lastAssignment int) (ResourceAssignment, *Status) {
resName corev1.ResourceName,
assignmentUsage cache.FlavorResourceQuantities,
) (ResourceAssignment, *Status) {
resourceGroup, found := a.cq.RGByResource[resName]
if !found {
return nil, &Status{
reasons: []string{fmt.Sprintf("resource %s unavailable in ClusterQueue", resName)},
}
}

status := &Status{}
requests = filterRequestedResources(requests, rg.CoveredResources)
requests = filterRequestedResources(requests, resourceGroup.CoveredResources)
podSpec := &a.wl.Obj.Spec.PodSets[psId].Template.Spec

var bestAssignment ResourceAssignment
bestAssignmentMode := NoFit

// We will only check against the flavors' labels for the resource.
selector := flavorSelector(spec, rg.LabelKeys)
flavorIdx := -1
for idx, flvQuotas := range rg.Flavors {
if features.Enabled(features.FlavorFungibility) && idx <= lastAssignment {
continue
}
flavor, exist := resourceFlavors[flvQuotas.Name]
selector := flavorSelector(podSpec, resourceGroup.LabelKeys)
assignedFlavorIdx := -1
idx := a.wl.LastAssignment.NextFlavorToTryForPodSetResource(psId, resName)
for ; idx < len(resourceGroup.Flavors); idx++ {
flvQuotas := resourceGroup.Flavors[idx]
flavor, exist := a.resourceFlavors[flvQuotas.Name]
if !exist {
log.Error(nil, "Flavor not found", "Flavor", flvQuotas.Name)
status.append(fmt.Sprintf("flavor %s not found", flvQuotas.Name))
continue
}
taint, untolerated := corev1helpers.FindMatchingUntoleratedTaint(flavor.Spec.NodeTaints, spec.Tolerations, func(t *corev1.Taint) bool {
taint, untolerated := corev1helpers.FindMatchingUntoleratedTaint(flavor.Spec.NodeTaints, podSpec.Tolerations, func(t *corev1.Taint) bool {
return t.Effect == corev1.TaintEffectNoSchedule || t.Effect == corev1.TaintEffectNoExecute
})
if untolerated {
Expand All @@ -403,15 +409,15 @@ func (a *Assignment) findFlavorForResourceGroup(
continue
}

flavorIdx = idx
assignedFlavorIdx = idx
needsBorrowing := false
assignments := make(ResourceAssignment, len(requests))
// Calculate representativeMode for this assignment as the worst mode among all requests.
representativeMode := Fit
for rName, val := range requests {
resQuota := flvQuotas.Resources[rName]
// Check considering the flavor usage by previous pod sets.
mode, borrow, s := fitsResourceQuota(flvQuotas.Name, rName, val+a.Usage[flvQuotas.Name][rName], cq, resQuota)
mode, borrow, s := a.fitsResourceQuota(flvQuotas.Name, rName, val+assignmentUsage[flvQuotas.Name][rName], resQuota)
if s != nil {
status.reasons = append(status.reasons, s.reasons...)
}
Expand All @@ -432,7 +438,7 @@ func (a *Assignment) findFlavorForResourceGroup(
}

if features.Enabled(features.FlavorFungibility) {
if !shouldTryNextFlavor(representativeMode, cq.FlavorFungibility, needsBorrowing) {
if !shouldTryNextFlavor(representativeMode, a.cq.FlavorFungibility, needsBorrowing) {
bestAssignment = assignments
bestAssignmentMode = representativeMode
break
Expand All @@ -455,11 +461,11 @@ func (a *Assignment) findFlavorForResourceGroup(

if features.Enabled(features.FlavorFungibility) {
for _, assignment := range bestAssignment {
if flavorIdx == len(rg.Flavors)-1 {
if assignedFlavorIdx == len(resourceGroup.Flavors)-1 {
// we have reach the last flavor, try from the first flavor next time
assignment.TriedFlavorIdx = -1
} else {
assignment.TriedFlavorIdx = flavorIdx
assignment.TriedFlavorIdx = assignedFlavorIdx
}
}
if bestAssignmentMode == Fit {
Expand Down Expand Up @@ -541,10 +547,10 @@ func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.Set[string]) nodeaffi
// if borrowing is required when preempting.
// If the flavor doesn't satisfy limits immediately (when waiting or preemption
// could help), it returns a Status with reasons.
func fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName, val int64, cq *cache.ClusterQueue, rQuota *cache.ResourceQuota) (FlavorAssignmentMode, bool, *Status) {
func (a *FlavorAssigner) fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName, val int64, rQuota *cache.ResourceQuota) (FlavorAssignmentMode, bool, *Status) {
var status Status
var borrow bool
used := cq.Usage[fName][rName]
used := a.cq.Usage[fName][rName]
mode := NoFit
if val <= rQuota.Nominal {
// The request can be satisfied by the nominal quota, assuming quota is
Expand All @@ -553,11 +559,11 @@ func fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.Resourc
mode = Preempt
}
cohortAvailable := rQuota.Nominal
if cq.Cohort != nil {
cohortAvailable = cq.RequestableCohortQuota(fName, rName)
if a.cq.Cohort != nil {
cohortAvailable = a.cq.RequestableCohortQuota(fName, rName)
}

if cq.Preemption.BorrowWithinCohort != nil && cq.Preemption.BorrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever {
if a.cq.Preemption.BorrowWithinCohort != nil && a.cq.Preemption.BorrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever {
// when preemption with borrowing is enabled, we can succeed to admit the
// workload if preemption is used.
if (rQuota.BorrowingLimit == nil || val <= rQuota.Nominal+*rQuota.BorrowingLimit) && val <= cohortAvailable {
Expand All @@ -571,8 +577,8 @@ func fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.Resourc
}

cohortUsed := used
if cq.Cohort != nil {
cohortUsed = cq.UsedCohortQuota(fName, rName)
if a.cq.Cohort != nil {
cohortUsed = a.cq.UsedCohortQuota(fName, rName)
}

lack := cohortUsed + val - cohortAvailable
Expand All @@ -582,7 +588,7 @@ func fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.Resourc

lackQuantity := workload.ResourceQuantity(rName, lack)
msg := fmt.Sprintf("insufficient unused quota in cohort for %s in flavor %s, %s more needed", rName, fName, &lackQuantity)
if cq.Cohort == nil {
if a.cq.Cohort == nil {
if mode == NoFit {
msg = fmt.Sprintf("insufficient quota for %s in flavor %s in ClusterQueue", rName, fName)
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/flavorassigner/flavorassigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2286,7 +2286,8 @@ func TestAssignFlavors(t *testing.T) {
}
tc.clusterQueue.UpdateWithFlavors(resourceFlavors)
tc.clusterQueue.UpdateRGByResource()
assignment := AssignFlavors(log, wlInfo, resourceFlavors, &tc.clusterQueue, nil)
flvAssigner := New(wlInfo, &tc.clusterQueue, resourceFlavors)
assignment := flvAssigner.Assign(log, nil)
if repMode := assignment.RepresentativeMode(); repMode != tc.wantRepMode {
t.Errorf("e.assignFlavors(_).RepresentativeMode()=%s, want %s", repMode, tc.wantRepMode)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ type partialAssignment struct {

func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) {
cq := snap.ClusterQueues[wl.ClusterQueue]
fullAssignment := flavorassigner.AssignFlavors(log, wl, snap.ResourceFlavors, cq, nil)
flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors)
fullAssignment := flvAssigner.Assign(log, nil)
var faPreemtionTargets []*workload.Info

arm := fullAssignment.RepresentativeMode()
Expand All @@ -398,7 +399,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac

if wl.CanBePartiallyAdmitted() {
reducer := flavorassigner.NewPodSetReducer(wl.Obj.Spec.PodSets, func(nextCounts []int32) (*partialAssignment, bool) {
assignment := flavorassigner.AssignFlavors(log, wl, snap.ResourceFlavors, cq, nextCounts)
assignment := flvAssigner.Assign(log, nextCounts)
if assignment.RepresentativeMode() == flavorassigner.Fit {
return &partialAssignment{assignment: assignment}, true
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/util/limitrange"
)
Expand Down Expand Up @@ -76,6 +77,20 @@ func (s *AssigmentClusterQueueState) PendingFlavors() bool {
return false
}

func (s *AssigmentClusterQueueState) NextFlavorToTryForPodSetResource(ps int, res corev1.ResourceName) int {
if !features.Enabled(features.FlavorFungibility) {
return 0
}
if s == nil || ps >= len(s.LastTriedFlavorIdx) {
return 0
}
idx, ok := s.LastTriedFlavorIdx[ps][res]
if !ok {
return 0
}
return idx + 1
}

// Info holds a Workload object and some pre-processing.
type Info struct {
Obj *kueue.Workload
Expand All @@ -91,7 +106,9 @@ type PodSetResources struct {
Name string
Requests Requests
Count int32
Flavors map[corev1.ResourceName]kueue.ResourceFlavorReference

// Flavors are populated when the Workload is assigned.
Flavors map[corev1.ResourceName]kueue.ResourceFlavorReference
}

func (psr *PodSetResources) ScaledTo(newCount int32) *PodSetResources {
Expand Down

0 comments on commit e10df68

Please sign in to comment.