From 34f31b7762eabf14414f27ff547922be038512a3 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Sun, 1 Dec 2024 09:50:07 +0000 Subject: [PATCH] lint --- internal/scheduler/nodedb/nodedb.go | 1 + internal/scheduler/scheduling/jobiteration.go | 30 ++++++++++++++----- ..._driven_preempting_queue_scheduler_test.go | 3 ++ .../scheduler/scheduling/queue_scheduler.go | 2 +- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index ba74a66fc8d..b0e3ad73bdb 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -701,6 +701,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *c for obj := it.Next(); obj != nil && selectedNode == nil; obj = it.Next() { evictedJobSchedulingContext := obj.(*EvictedJobSchedulingContext) evictedJctx := evictedJobSchedulingContext.JobSchedulingContext + println(fmt.Sprintf("evicting job with ts %d", evictedJctx.Job.SubmitTime())) nodeId := evictedJctx.GetAssignedNodeId() if nodeId == "" { return nil, errors.Errorf("evicted job %s does not have an assigned nodeId", evictedJctx.JobId) diff --git a/internal/scheduler/scheduling/jobiteration.go b/internal/scheduler/scheduling/jobiteration.go index ccdd7909c56..1a4986e9857 100644 --- a/internal/scheduler/scheduling/jobiteration.go +++ b/internal/scheduler/scheduling/jobiteration.go @@ -174,6 +174,10 @@ func (it *MultiJobsIterator) Next() (*schedulercontext.JobSchedulingContext, err type MarketDrivenMultiJobsIterator struct { it1 JobContextIterator it2 JobContextIterator + + // TOOD: ideally we add peek() to JobContextIterator and remove these + it1Value *schedulercontext.JobSchedulingContext + it2Value *schedulercontext.JobSchedulingContext } func NewMarketDrivenMultiJobsIterator(it1, it2 JobContextIterator) *MarketDrivenMultiJobsIterator { @@ -184,32 +188,44 @@ func NewMarketDrivenMultiJobsIterator(it1, it2 JobContextIterator) *MarketDriven } func (it *MarketDrivenMultiJobsIterator) Next() (*schedulercontext.JobSchedulingContext, error) { - j1, err := it.it1.Next() - if err != nil { - return nil, err + if it.it1Value == nil { + j, err := it.it1.Next() + if err != nil { + return nil, err + } + it.it1Value = j } - j2, err := it.it2.Next() - if err != nil { - return nil, err + if it.it2Value == nil { + j, err := it.it2.Next() + if err != nil { + return nil, err + } + it.it2Value = j } + j1 := it.it1Value + j2 := it.it2Value // Both iterators active. - if j1 != nil && j2 != nil { + if it.it1Value != nil && j2 != nil { if (jobdb.MarketSchedulingOrderCompare(j1.Job, j2.Job)) < 0 { + it.it1Value = nil return j1, nil } else { + it.it2Value = nil return j2, nil } } // Only first iterator has job if j1 != nil { + it.it1Value = nil return j1, nil } // Only second iterator has job if j2 != nil { + it.it2Value = nil return j2, nil } diff --git a/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go index 35903f307c5..89c4ce3fe24 100644 --- a/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/market_driven_preempting_queue_scheduler_test.go @@ -401,6 +401,9 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) { result, err := sch.Schedule(ctx) require.NoError(t, err) + for _, j := range result.PreemptedJobs { + ctx.Infof("Preempted job with submittred time %d", j.Job.SubmitTime()) + } jobIdsByGangId = sch.jobIdsByGangId gangIdByJobId = sch.gangIdByJobId diff --git a/internal/scheduler/scheduling/queue_scheduler.go b/internal/scheduler/scheduling/queue_scheduler.go index fe0190d40a3..f01ff26dcaf 100644 --- a/internal/scheduler/scheduling/queue_scheduler.go +++ b/internal/scheduler/scheduling/queue_scheduler.go @@ -114,7 +114,7 @@ func (sch *QueueScheduler) Schedule(ctx *armadacontext.Context) (*SchedulerResul return nil, err default: } - + ctx.Infof("Scheduling job in queue %s with price %d and submit time %s", gctx.Queue, gctx.JobSchedulingContexts[0].Job.BidPrice(), gctx.JobSchedulingContexts[0].Job.SubmitTime()) start := time.Now() scheduledOk, unschedulableReason, err := sch.gangScheduler.Schedule(ctx, gctx) if err != nil {