From c3e2fca7c2cb76964ea619eeda17f52a61326292 Mon Sep 17 00:00:00 2001 From: Monokaix <2536818783@qq.com> Date: Sun, 7 Apr 2024 17:16:18 +0800 Subject: [PATCH] Revert "fix: exec job validate after init ssn.Tiers." This reverts commit 76ad29472f2c70109f81fa45e0cec5b0509a381a. Signed-off-by: Monokaix <2536818783@qq.com> --- pkg/scheduler/actions/allocate/allocate.go | 5 ++++ pkg/scheduler/actions/backfill/backfill.go | 5 ++++ pkg/scheduler/actions/preempt/preempt.go | 5 ++++ pkg/scheduler/actions/reclaim/reclaim.go | 5 ++++ pkg/scheduler/framework/framework.go | 29 ---------------------- pkg/scheduler/framework/session.go | 29 ++++++++++++++++++++-- 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 1aa77cda74..888c10dc9a 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -81,6 +81,11 @@ func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map job.PodGroup.Status.Phase = scheduling.PodGroupInqueue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } + if _, found := ssn.Queues[job.Queue]; !found { klog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found", job.Namespace, job.Name, job.Queue) diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 10603059a7..561ed4a52f 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -121,6 +121,11 @@ func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskIn continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + klog.V(4).Infof("Job <%s/%s> Queue <%s> skip backfill, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } + queue, found := ssn.Queues[job.Queue] if !found { continue diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 1a7c561a5c..0ff3032b16 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -54,6 +54,11 @@ func (pmpt *Action) Execute(ssn *framework.Session) { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + klog.V(4).Infof("Job <%s/%s> Queue <%s> skip preemption, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } + if queue, found := ssn.Queues[job.Queue]; !found { continue } else if _, existed := queues[queue.UID]; !existed { diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index a773f41edb..430ec99a98 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -54,6 +54,11 @@ func (ra *Action) Execute(ssn *framework.Session) { continue } + if vr := ssn.JobValid(job); vr != nil && !vr.Pass { + klog.V(4).Infof("Job <%s/%s> Queue <%s> skip reclaim, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message) + continue + } + if queue, found := ssn.Queues[job.Queue]; !found { klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", job.Queue, job.Namespace, job.Name) diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index c0f6400976..c3b9396bac 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -19,11 +19,8 @@ package framework import ( "time" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" - "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" "volcano.sh/volcano/pkg/scheduler/metrics" @@ -50,32 +47,6 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Con } } } - for _, job := range ssn.Jobs { - if job.PodGroup != nil { - ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy() - } - - if vjr := ssn.JobValid(job); vjr != nil { - if !vjr.Pass { - jc := &scheduling.PodGroupCondition{ - Type: scheduling.PodGroupUnschedulableType, - Status: v1.ConditionTrue, - LastTransitionTime: metav1.Now(), - TransitionID: string(ssn.UID), - Reason: vjr.Reason, - Message: vjr.Message, - } - - if err := ssn.UpdatePodGroupCondition(job, jc); err != nil { - klog.Errorf("Failed to update job condition: %v", err) - } - } - - delete(ssn.Jobs, job.UID) - } - } - klog.V(3).Infof("Session %v with <%d> Job after valid", - ssn.UID, len(ssn.Jobs)) return ssn } diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 7b44cbece2..10feb5657e 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -21,6 +21,7 @@ import ( "reflect" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/informers" @@ -147,6 +148,30 @@ func openSession(cache cache.Cache) *Session { snapshot := cache.Snapshot() ssn.Jobs = snapshot.Jobs + for _, job := range ssn.Jobs { + if job.PodGroup != nil { + ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy() + } + + if vjr := ssn.JobValid(job); vjr != nil { + if !vjr.Pass { + jc := &scheduling.PodGroupCondition{ + Type: scheduling.PodGroupUnschedulableType, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + TransitionID: string(ssn.UID), + Reason: vjr.Reason, + Message: vjr.Message, + } + + if err := ssn.UpdatePodGroupCondition(job, jc); err != nil { + klog.Errorf("Failed to update job condition: %v", err) + } + } + + delete(ssn.Jobs, job.UID) + } + } ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList) ssn.Nodes = snapshot.Nodes ssn.CSINodesStatus = snapshot.CSINodesStatus @@ -167,7 +192,7 @@ func openSession(cache cache.Cache) *Session { // updateQueueStatus updates allocated field in queue status on session close. func updateQueueStatus(ssn *Session) { // calculate allocated resources on each queue - allocatedResources := make(map[api.QueueID]*api.Resource, len(ssn.Queues)) + var allocatedResources = make(map[api.QueueID]*api.Resource, len(ssn.Queues)) for queueID := range ssn.Queues { allocatedResources[queueID] = &api.Resource{} } @@ -180,7 +205,7 @@ func updateQueueStatus(ssn *Session) { // update queue status for queueID := range ssn.Queues { // convert api.Resource to v1.ResourceList - queueStatus := util.ConvertRes2ResList(allocatedResources[queueID]).DeepCopy() + var queueStatus = util.ConvertRes2ResList(allocatedResources[queueID]).DeepCopy() if reflect.DeepEqual(ssn.Queues[queueID].Queue.Status.Allocated, queueStatus) { klog.V(5).Infof("Queue <%s> allocated resource keeps equal, no need to update queue status <%v>.", queueID, ssn.Queues[queueID].Queue.Status.Allocated)