Skip to content

Commit

Permalink
Merge pull request volcano-sh#3394 from Monokaix/revert
Browse files Browse the repository at this point in the history
Revert "fix: exec job validate after init ssn.Tiers."
  • Loading branch information
volcano-sh-bot authored Apr 7, 2024
2 parents 4d0216a + c3e2fca commit 54e0dda
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 31 deletions.
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
}

if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if _, found := ssn.Queues[job.Queue]; !found {
klog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
job.Namespace, job.Name, job.Queue)
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskIn
continue
}

if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip backfill, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

queue, found := ssn.Queues[job.Queue]
if !found {
continue
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
continue
}

if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip preemption, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
continue
} else if _, existed := queues[queue.UID]; !existed {
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip reclaim, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
Expand Down
29 changes: 0 additions & 29 deletions pkg/scheduler/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package framework
import (
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/metrics"
Expand All @@ -50,32 +47,6 @@ func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Con
}
}
}
for _, job := range ssn.Jobs {
if job.PodGroup != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
}

if vjr := ssn.JobValid(job); vjr != nil {
if !vjr.Pass {
jc := &scheduling.PodGroupCondition{
Type: scheduling.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
TransitionID: string(ssn.UID),
Reason: vjr.Reason,
Message: vjr.Message,
}

if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
klog.Errorf("Failed to update job condition: %v", err)
}
}

delete(ssn.Jobs, job.UID)
}
}
klog.V(3).Infof("Session %v with <%d> Job after valid",
ssn.UID, len(ssn.Jobs))
return ssn
}

Expand Down
29 changes: 27 additions & 2 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -147,6 +148,30 @@ func openSession(cache cache.Cache) *Session {
snapshot := cache.Snapshot()

ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
if job.PodGroup != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
}

if vjr := ssn.JobValid(job); vjr != nil {
if !vjr.Pass {
jc := &scheduling.PodGroupCondition{
Type: scheduling.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
TransitionID: string(ssn.UID),
Reason: vjr.Reason,
Message: vjr.Message,
}

if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {
klog.Errorf("Failed to update job condition: %v", err)
}
}

delete(ssn.Jobs, job.UID)
}
}
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList)
ssn.Nodes = snapshot.Nodes
ssn.CSINodesStatus = snapshot.CSINodesStatus
Expand All @@ -167,7 +192,7 @@ func openSession(cache cache.Cache) *Session {
// updateQueueStatus updates allocated field in queue status on session close.
func updateQueueStatus(ssn *Session) {
// calculate allocated resources on each queue
allocatedResources := make(map[api.QueueID]*api.Resource, len(ssn.Queues))
var allocatedResources = make(map[api.QueueID]*api.Resource, len(ssn.Queues))
for queueID := range ssn.Queues {
allocatedResources[queueID] = &api.Resource{}
}
Expand All @@ -180,7 +205,7 @@ func updateQueueStatus(ssn *Session) {
// update queue status
for queueID := range ssn.Queues {
// convert api.Resource to v1.ResourceList
queueStatus := util.ConvertRes2ResList(allocatedResources[queueID]).DeepCopy()
var queueStatus = util.ConvertRes2ResList(allocatedResources[queueID]).DeepCopy()
if reflect.DeepEqual(ssn.Queues[queueID].Queue.Status.Allocated, queueStatus) {
klog.V(5).Infof("Queue <%s> allocated resource keeps equal, no need to update queue status <%v>.",
queueID, ssn.Queues[queueID].Queue.Status.Allocated)
Expand Down

0 comments on commit 54e0dda

Please sign in to comment.