diff --git a/pkg/apis/scheduler/api.go b/pkg/apis/scheduler/api.go index 5baf89003d8..6f9855b6b6b 100644 --- a/pkg/apis/scheduler/api.go +++ b/pkg/apis/scheduler/api.go @@ -90,6 +90,9 @@ type ScheduleInput struct { // For Migrate CpuNumaPin []SCpuNumaPin `json:"cpu_numa_pin"` + // GuestIds + GuestIds []string `json:"guest_ids"` + HostMemPageSizeKB int `json:"host_mem_page_size"` SkipKernelCheck *bool `json:"skip_kernel_check"` TargetHostKernel string `json:"target_host_kernel"` diff --git a/pkg/compute/tasks/schedule.go b/pkg/compute/tasks/schedule.go index eb78431ab37..c4d6c122581 100644 --- a/pkg/compute/tasks/schedule.go +++ b/pkg/compute/tasks/schedule.go @@ -141,6 +141,12 @@ func doScheduleObjects( return } + sort.Sort(sortedIScheduleModelList(objs)) + schedInput.GuestIds = make([]string, len(objs)) + for i := range objs { + schedInput.GuestIds[i] = objs[i].GetId() + } + output, err := doScheduleWithInput(ctx, task, schedInput, len(objs)) if err != nil { onSchedulerRequestFail(ctx, task, objs, jsonutils.NewString(err.Error())) @@ -202,7 +208,6 @@ func onSchedulerResults( task.SaveScheduleResult(ctx, nil, results[0], 0) return } - sort.Sort(sortedIScheduleModelList(objs)) succCount := 0 for idx := 0; idx < len(objs); idx += 1 { obj := objs[idx] diff --git a/pkg/scheduler/cache/candidate/hosts.go b/pkg/scheduler/cache/candidate/hosts.go index d5969bd3652..65ccc98e7f0 100644 --- a/pkg/scheduler/cache/candidate/hosts.go +++ b/pkg/scheduler/cache/candidate/hosts.go @@ -1343,6 +1343,7 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels guestsOnHost = append(guestsOnHost, backupGuestsOnHost...) } + pendingUsage := desc.GetPendingUsage() desc.Tenants = make(map[string]int64) for _, gst := range guestsOnHost { guest := gst.(computemodels.SGuest) @@ -1352,25 +1353,33 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels } else { desc.Tenants[projectId] = 1 } - if IsGuestRunning(guest) { - runningCount++ - memSize += int64(guest.VmemSize) - cpuCount += int64(guest.VcpuCount) - if guest.CpuNumaPin != nil { - cpuNumaPin := make([]scheduler.SCpuNumaPin, 0) - if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil { - return errors.Wrap(err, "unmarshal cpu numa pin") - } - guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...) - } - } else if IsGuestCreating(guest) { - creatingGuestCount++ - creatingMemSize += int64(guest.VmemSize) - creatingCPUCount += int64(guest.VcpuCount) - } else if IsGuestPendingDelete(guest) { + + if IsGuestPendingDelete(guest) { memFakeDeletedSize += int64(guest.VmemSize) cpuFakeDeletedCount += int64(guest.VcpuCount) + } else { + if _, ok := pendingUsage.PendingGuestIds[guest.Id]; ok { + log.Infof("fillGuestsResourceInfo guest %s in pending usage", guest.Id) + continue + } + if IsGuestRunning(guest) { + runningCount++ + memSize += int64(guest.VmemSize) + cpuCount += int64(guest.VcpuCount) + if guest.CpuNumaPin != nil { + cpuNumaPin := make([]scheduler.SCpuNumaPin, 0) + if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil { + return errors.Wrap(err, "unmarshal cpu numa pin") + } + guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...) + } + } else if IsGuestCreating(guest) { + creatingGuestCount++ + creatingMemSize += int64(guest.VmemSize) + creatingCPUCount += int64(guest.VcpuCount) + } } + guestCount++ cpuReqCount += int64(guest.VcpuCount) memReqSize += int64(guest.VmemSize) diff --git a/pkg/scheduler/manager/task_queue.go b/pkg/scheduler/manager/task_queue.go index e975854cd3a..f4ccb31a772 100644 --- a/pkg/scheduler/manager/task_queue.go +++ b/pkg/scheduler/manager/task_queue.go @@ -132,8 +132,16 @@ func setSchedPendingUsage(driver computemodels.IGuestDriver, req *api.SchedInfo, if req.IsSuggestion || IsDriverSkipScheduleDirtyMark(driver) { return nil } - for _, item := range resp.Candidates { - schedmodels.HostPendingUsageManager.AddPendingUsage(req, item) + for i, item := range resp.Candidates { + if item.Error != "" { + // schedule failed skip add pending usage + continue + } + var guestId string + if len(req.GuestIds) > i { + guestId = req.GuestIds[i] + } + schedmodels.HostPendingUsageManager.AddPendingUsage(guestId, req, item) } return nil } diff --git a/pkg/scheduler/models/pending_usage.go b/pkg/scheduler/models/pending_usage.go index 1f2a88920b6..8fedfd1e743 100644 --- a/pkg/scheduler/models/pending_usage.go +++ b/pkg/scheduler/models/pending_usage.go @@ -50,15 +50,11 @@ func (m *SHostPendingUsageManager) Keyword() string { } func (m *SHostPendingUsageManager) newSessionUsage(req *api.SchedInfo, hostId string, candidate *schedapi.CandidateResource) *SessionPendingUsage { - su := NewSessionUsage(req.SessionId, hostId) - su.Usage = NewPendingUsageBySchedInfo(hostId, req, candidate) + usage := NewPendingUsageBySchedInfo(hostId, req, candidate) + su := NewSessionUsage(req.SessionId, hostId, usage) return su } -func (m *SHostPendingUsageManager) newPendingUsage(hostId string) *SPendingUsage { - return NewPendingUsageBySchedInfo(hostId, nil, nil) -} - func (m *SHostPendingUsageManager) GetPendingUsage(hostId string) (*SPendingUsage, error) { return m.getPendingUsage(hostId) } @@ -75,7 +71,7 @@ func (m *SHostPendingUsageManager) GetSessionUsage(sessionId, hostId string) (*S return m.store.GetSessionUsage(sessionId, hostId) } -func (m *SHostPendingUsageManager) AddPendingUsage(req *api.SchedInfo, candidate *schedapi.CandidateResource) { +func (m *SHostPendingUsageManager) AddPendingUsage(guestId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) { hostId := candidate.HostId sessionUsage, _ := m.GetSessionUsage(req.SessionId, hostId) @@ -83,25 +79,25 @@ func (m *SHostPendingUsageManager) AddPendingUsage(req *api.SchedInfo, candidate sessionUsage = m.newSessionUsage(req, hostId, candidate) sessionUsage.StartTimer() } - m.addSessionUsage(candidate.HostId, sessionUsage) + m.addSessionUsage(candidate.HostId, guestId, sessionUsage) if candidate.BackupCandidate != nil { - m.AddPendingUsage(req, candidate.BackupCandidate) + m.AddPendingUsage(guestId, req, candidate.BackupCandidate) } } // addSessionUsage add pending usage and session usage -func (m *SHostPendingUsageManager) addSessionUsage(hostId string, usage *SessionPendingUsage) { +func (m *SHostPendingUsageManager) addSessionUsage(hostId, guestId string, usage *SessionPendingUsage) { ctx := context.Background() lockman.LockClass(ctx, m, hostId) defer lockman.ReleaseClass(ctx, m, hostId) pendingUsage, _ := m.getPendingUsage(hostId) if pendingUsage == nil { - pendingUsage = m.newPendingUsage(hostId) + pendingUsage = NewPendingUsageBySchedInfo(hostId, nil, nil) } // add pending usage - pendingUsage.Add(usage.Usage) - usage.AddCount() + pendingUsage.Add(usage.Usage, guestId) + usage.AddCount(guestId) m.store.SetSessionUsage(usage.SessionId, hostId, usage) m.store.SetPendingUsage(hostId, pendingUsage) } @@ -186,11 +182,11 @@ type SessionPendingUsage struct { cancelCh chan string } -func NewSessionUsage(sid, hostId string) *SessionPendingUsage { +func NewSessionUsage(sid, hostId string, usage *SPendingUsage) *SessionPendingUsage { su := &SessionPendingUsage{ HostId: hostId, SessionId: sid, - Usage: NewPendingUsageBySchedInfo(hostId, nil, nil), + Usage: usage, count: 0, countLock: new(sync.Mutex), cancelCh: make(chan string), @@ -202,16 +198,21 @@ func (su *SessionPendingUsage) GetHostId() string { return su.Usage.HostId } -func (su *SessionPendingUsage) AddCount() { +func (su *SessionPendingUsage) AddCount(guestId string) { su.countLock.Lock() defer su.countLock.Unlock() su.count++ + su.Usage.PendingGuestIds[guestId] = struct{}{} } func (su *SessionPendingUsage) SubCount() { su.countLock.Lock() defer su.countLock.Unlock() su.count-- + for guestId, _ := range su.Usage.PendingGuestIds { + delete(su.Usage.PendingGuestIds, guestId) + break + } } type SResourcePendingUsage struct { @@ -295,6 +296,8 @@ type SPendingUsage struct { CpuPin map[int]int Memory int + PendingGuestIds map[string]struct{} + // nodeId: memSizeMB NumaMemPin map[int]int IsolatedDevice int @@ -306,9 +309,10 @@ type SPendingUsage struct { func NewPendingUsageBySchedInfo(hostId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) *SPendingUsage { u := &SPendingUsage{ - HostId: hostId, - DiskUsage: NewResourcePendingUsage(nil), - NetUsage: NewResourcePendingUsage(nil), + HostId: hostId, + DiskUsage: NewResourcePendingUsage(nil), + NetUsage: NewResourcePendingUsage(nil), + PendingGuestIds: make(map[string]struct{}), } // group init @@ -385,7 +389,7 @@ func (self *SPendingUsage) ToMap() map[string]interface{} { } } -func (self *SPendingUsage) Add(sUsage *SPendingUsage) { +func (self *SPendingUsage) Add(sUsage *SPendingUsage, addGuestId string) { self.Cpu = self.Cpu + sUsage.Cpu for k, v1 := range sUsage.CpuPin { if v2, ok := self.CpuPin[k]; ok { @@ -395,6 +399,17 @@ func (self *SPendingUsage) Add(sUsage *SPendingUsage) { } } + for guestId := range sUsage.PendingGuestIds { + if _, ok := self.PendingGuestIds[guestId]; !ok { + log.Infof("add guest %s in pending usage", guestId) + self.PendingGuestIds[guestId] = struct{}{} + } + } + if addGuestId != "" { + log.Infof("add guest %s in pending usage", addGuestId) + self.PendingGuestIds[addGuestId] = struct{}{} + } + self.Memory = self.Memory + sUsage.Memory for k, v1 := range sUsage.NumaMemPin { if v2, ok := self.NumaMemPin[k]; ok { @@ -423,6 +438,11 @@ func (self *SPendingUsage) Sub(sUsage *SPendingUsage) { } } + for guestId := range sUsage.PendingGuestIds { + log.Infof("delete guest %s in pending usage", guestId) + delete(self.PendingGuestIds, guestId) + } + self.Memory = quotas.NonNegative(self.Memory - sUsage.Memory) for k, v1 := range sUsage.NumaMemPin { if v2, ok := self.NumaMemPin[k]; ok {