Skip to content

Commit

Permalink
fix(scheduler,region): fill resource info check guest is in pending u…
Browse files Browse the repository at this point in the history
…sage
  • Loading branch information
wanyaoqi committed Nov 27, 2024
1 parent c7f76c7 commit b3f6570
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 53 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type ScheduleInput struct {
CpuNumaPin []SCpuNumaPin `json:"cpu_numa_pin"`
PreferNumaNodes []int `json:"prefer_numa_nodes"`

// GuestIds
GuestIds []string `json:"guest_ids"`

HostMemPageSizeKB int `json:"host_mem_page_size"`
SkipKernelCheck *bool `json:"skip_kernel_check"`
TargetHostKernel string `json:"target_host_kernel"`
Expand Down
7 changes: 6 additions & 1 deletion pkg/compute/tasks/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func doScheduleObjects(
return
}

sort.Sort(sortedIScheduleModelList(objs))
schedInput.GuestIds = make([]string, len(objs))
for i := range objs {
schedInput.GuestIds[i] = objs[i].GetId()
}

output, err := doScheduleWithInput(ctx, task, schedInput, len(objs))
if err != nil {
onSchedulerRequestFail(ctx, task, objs, jsonutils.NewString(err.Error()))
Expand Down Expand Up @@ -202,7 +208,6 @@ func onSchedulerResults(
task.SaveScheduleResult(ctx, nil, results[0], 0)
return
}
sort.Sort(sortedIScheduleModelList(objs))
succCount := 0
for idx := 0; idx < len(objs); idx += 1 {
obj := objs[idx]
Expand Down
66 changes: 36 additions & 30 deletions pkg/scheduler/cache/candidate/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels
guestsOnHost = append(guestsOnHost, backupGuestsOnHost...)
}

//pendingUsage := desc.GetPendingUsage()
pendingUsage := desc.GetPendingUsage()
desc.Tenants = make(map[string]int64)
for _, gst := range guestsOnHost {
guest := gst.(computemodels.SGuest)
Expand All @@ -1527,41 +1527,46 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels
if IsGuestPendingDelete(guest) {
memFakeDeletedSize += int64(guest.VmemSize)
cpuFakeDeletedCount += int64(guest.VcpuCount)
} else if IsGuestCreating(guest) {
creatingGuestCount++
creatingMemSize += int64(guest.VmemSize)
creatingCPUCount += int64(guest.VcpuCount)
if host.EnableNumaAllocate && guest.CpuNumaPin != nil {
cpuNumaPin := make([]scheduler.SCpuNumaPin, 0)
if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil {
return errors.Wrap(err, "unmarshal cpu numa pin")
}
for i := range cpuNumaPin {
if cpuNumaPin[i].ExtraCpuCount > 0 {
creatingCPUCount += int64(cpuNumaPin[i].ExtraCpuCount)
}
}
guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...)
} else {
if _, ok := pendingUsage.PendingGuestIds[guest.Id]; ok {
log.Infof("fillGuestsResourceInfo guest %s in pending usage", guest.Id)
continue
}
} else if !IsGuestStoppedStatus(guest) {
// running status
runningCount++
memSize += int64(guest.VmemSize)
cpuCount += int64(guest.VcpuCount)
if host.EnableNumaAllocate && guest.CpuNumaPin != nil {
cpuNumaPin := make([]scheduler.SCpuNumaPin, 0)
if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil {
return errors.Wrap(err, "unmarshal cpu numa pin")
if IsGuestCreating(guest) {
creatingGuestCount++
creatingMemSize += int64(guest.VmemSize)
creatingCPUCount += int64(guest.VcpuCount)
if host.EnableNumaAllocate && guest.CpuNumaPin != nil {
cpuNumaPin := make([]scheduler.SCpuNumaPin, 0)
if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil {
return errors.Wrap(err, "unmarshal cpu numa pin")
}
for i := range cpuNumaPin {
if cpuNumaPin[i].ExtraCpuCount > 0 {
creatingCPUCount += int64(cpuNumaPin[i].ExtraCpuCount)
}
}
guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...)
}
for i := range cpuNumaPin {
if cpuNumaPin[i].ExtraCpuCount > 0 {
creatingCPUCount += int64(cpuNumaPin[i].ExtraCpuCount)
} else if !IsGuestStoppedStatus(guest) {
// running status
runningCount++
memSize += int64(guest.VmemSize)
cpuCount += int64(guest.VcpuCount)
if host.EnableNumaAllocate && guest.CpuNumaPin != nil {
cpuNumaPin := make([]scheduler.SCpuNumaPin, 0)
if err := guest.CpuNumaPin.Unmarshal(&cpuNumaPin); err != nil {
return errors.Wrap(err, "unmarshal cpu numa pin")
}
for i := range cpuNumaPin {
if cpuNumaPin[i].ExtraCpuCount > 0 {
creatingCPUCount += int64(cpuNumaPin[i].ExtraCpuCount)
}
}
guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...)
}
guestsCpuNumaPin = append(guestsCpuNumaPin, cpuNumaPin...)
}
}

//if IsGuestRunning(guest) {
// runningCount++
// memSize += int64(guest.VmemSize)
Expand All @@ -1588,6 +1593,7 @@ func (b *HostBuilder) fillGuestsResourceInfo(desc *HostDesc, host *computemodels
// memFakeDeletedSize += int64(guest.VmemSize)
// cpuFakeDeletedCount += int64(guest.VcpuCount)
//}

guestCount++
cpuReqCount += int64(guest.VcpuCount)
memReqSize += int64(guest.VmemSize)
Expand Down
12 changes: 10 additions & 2 deletions pkg/scheduler/manager/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,16 @@ func setSchedPendingUsage(driver computemodels.IGuestDriver, req *api.SchedInfo,
if req.IsSuggestion || IsDriverSkipScheduleDirtyMark(driver) {
return nil
}
for _, item := range resp.Candidates {
schedmodels.HostPendingUsageManager.AddPendingUsage(req, item)
for i, item := range resp.Candidates {
if item.Error != "" {
// schedule failed skip add pending usage
continue
}
var guestId string
if len(req.GuestIds) > i {
guestId = req.GuestIds[i]
}
schedmodels.HostPendingUsageManager.AddPendingUsage(guestId, req, item)
}
return nil
}
Expand Down
60 changes: 40 additions & 20 deletions pkg/scheduler/models/pending_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,11 @@ func (m *SHostPendingUsageManager) Keyword() string {
}

func (m *SHostPendingUsageManager) newSessionUsage(req *api.SchedInfo, hostId string, candidate *schedapi.CandidateResource) *SessionPendingUsage {
su := NewSessionUsage(req.SessionId, hostId)
su.Usage = NewPendingUsageBySchedInfo(hostId, req, candidate)
usage := NewPendingUsageBySchedInfo(hostId, req, candidate)
su := NewSessionUsage(req.SessionId, hostId, usage)
return su
}

func (m *SHostPendingUsageManager) newPendingUsage(hostId string) *SPendingUsage {
return NewPendingUsageBySchedInfo(hostId, nil, nil)
}

func (m *SHostPendingUsageManager) GetPendingUsage(hostId string) (*SPendingUsage, error) {
return m.getPendingUsage(hostId)
}
Expand All @@ -75,33 +71,33 @@ func (m *SHostPendingUsageManager) GetSessionUsage(sessionId, hostId string) (*S
return m.store.GetSessionUsage(sessionId, hostId)
}

func (m *SHostPendingUsageManager) AddPendingUsage(req *api.SchedInfo, candidate *schedapi.CandidateResource) {
func (m *SHostPendingUsageManager) AddPendingUsage(guestId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) {
hostId := candidate.HostId

sessionUsage, _ := m.GetSessionUsage(req.SessionId, hostId)
if sessionUsage == nil {
sessionUsage = m.newSessionUsage(req, hostId, candidate)
sessionUsage.StartTimer()
}
m.addSessionUsage(candidate.HostId, sessionUsage)
m.addSessionUsage(candidate.HostId, guestId, sessionUsage)
if candidate.BackupCandidate != nil {
m.AddPendingUsage(req, candidate.BackupCandidate)
m.AddPendingUsage(guestId, req, candidate.BackupCandidate)
}
}

// addSessionUsage add pending usage and session usage
func (m *SHostPendingUsageManager) addSessionUsage(hostId string, usage *SessionPendingUsage) {
func (m *SHostPendingUsageManager) addSessionUsage(hostId, guestId string, usage *SessionPendingUsage) {
ctx := context.Background()
lockman.LockClass(ctx, m, hostId)
defer lockman.ReleaseClass(ctx, m, hostId)

pendingUsage, _ := m.getPendingUsage(hostId)
if pendingUsage == nil {
pendingUsage = m.newPendingUsage(hostId)
pendingUsage = NewPendingUsageBySchedInfo(hostId, nil, nil)
}
// add pending usage
pendingUsage.Add(usage.Usage)
usage.AddCount()
pendingUsage.Add(usage.Usage, guestId)
usage.AddCount(guestId)
m.store.SetSessionUsage(usage.SessionId, hostId, usage)
m.store.SetPendingUsage(hostId, pendingUsage)
}
Expand Down Expand Up @@ -186,11 +182,11 @@ type SessionPendingUsage struct {
cancelCh chan string
}

func NewSessionUsage(sid, hostId string) *SessionPendingUsage {
func NewSessionUsage(sid, hostId string, usage *SPendingUsage) *SessionPendingUsage {
su := &SessionPendingUsage{
HostId: hostId,
SessionId: sid,
Usage: NewPendingUsageBySchedInfo(hostId, nil, nil),
Usage: usage,
count: 0,
countLock: new(sync.Mutex),
cancelCh: make(chan string),
Expand All @@ -202,16 +198,21 @@ func (su *SessionPendingUsage) GetHostId() string {
return su.Usage.HostId
}

func (su *SessionPendingUsage) AddCount() {
func (su *SessionPendingUsage) AddCount(guestId string) {
su.countLock.Lock()
defer su.countLock.Unlock()
su.count++
su.Usage.PendingGuestIds[guestId] = struct{}{}
}

func (su *SessionPendingUsage) SubCount() {
su.countLock.Lock()
defer su.countLock.Unlock()
su.count--
for guestId, _ := range su.Usage.PendingGuestIds {
delete(su.Usage.PendingGuestIds, guestId)
break
}
}

type SResourcePendingUsage struct {
Expand Down Expand Up @@ -295,6 +296,8 @@ type SPendingUsage struct {
CpuPin map[int]int
Memory int

PendingGuestIds map[string]struct{}

// nodeId: memSizeMB
NumaMemPin map[int]int
IsolatedDevice int
Expand All @@ -306,9 +309,10 @@ type SPendingUsage struct {

func NewPendingUsageBySchedInfo(hostId string, req *api.SchedInfo, candidate *schedapi.CandidateResource) *SPendingUsage {
u := &SPendingUsage{
HostId: hostId,
DiskUsage: NewResourcePendingUsage(nil),
NetUsage: NewResourcePendingUsage(nil),
HostId: hostId,
DiskUsage: NewResourcePendingUsage(nil),
NetUsage: NewResourcePendingUsage(nil),
PendingGuestIds: make(map[string]struct{}),
}

// group init
Expand Down Expand Up @@ -385,7 +389,7 @@ func (self *SPendingUsage) ToMap() map[string]interface{} {
}
}

func (self *SPendingUsage) Add(sUsage *SPendingUsage) {
func (self *SPendingUsage) Add(sUsage *SPendingUsage, addGuestId string) {
self.Cpu = self.Cpu + sUsage.Cpu
for k, v1 := range sUsage.CpuPin {
if v2, ok := self.CpuPin[k]; ok {
Expand All @@ -395,6 +399,17 @@ func (self *SPendingUsage) Add(sUsage *SPendingUsage) {
}
}

for guestId := range sUsage.PendingGuestIds {
if _, ok := self.PendingGuestIds[guestId]; !ok {
log.Infof("add guest %s in pending usage", guestId)
self.PendingGuestIds[guestId] = struct{}{}
}
}
if addGuestId != "" {
log.Infof("add guest %s in pending usage", addGuestId)
self.PendingGuestIds[addGuestId] = struct{}{}
}

self.Memory = self.Memory + sUsage.Memory
for k, v1 := range sUsage.NumaMemPin {
if v2, ok := self.NumaMemPin[k]; ok {
Expand Down Expand Up @@ -423,6 +438,11 @@ func (self *SPendingUsage) Sub(sUsage *SPendingUsage) {
}
}

for guestId := range sUsage.PendingGuestIds {
log.Infof("delete guest %s in pending usage", guestId)
delete(self.PendingGuestIds, guestId)
}

self.Memory = quotas.NonNegative(self.Memory - sUsage.Memory)
for k, v1 := range sUsage.NumaMemPin {
if v2, ok := self.NumaMemPin[k]; ok {
Expand Down

0 comments on commit b3f6570

Please sign in to comment.