Skip to content

Commit

Permalink
fix(region,scheduler,host): container numa aware (#21506)
Browse files Browse the repository at this point in the history
- optimize host register isolated device update
- container device base try detect numa node
- container alloc numa memory
- calc numa distance before select isolated device

Signed-off-by: wanyaoqi <[email protected]>
  • Loading branch information
wanyaoqi authored Nov 1, 2024
1 parent 8d45dcf commit dc88d51
Show file tree
Hide file tree
Showing 22 changed files with 521 additions and 102 deletions.
3 changes: 2 additions & 1 deletion pkg/apis/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type ScheduleInput struct {
ResetCpuNumaPin bool `json:"reset_cpu_numa_pin"`

// For Migrate
CpuNumaPin []SCpuNumaPin `json:"cpu_numa_pin"`
CpuNumaPin []SCpuNumaPin `json:"cpu_numa_pin"`
PreferNumaNodes []int `json:"prefer_numa_nodes"`

HostMemPageSizeKB int `json:"host_mem_page_size"`
SkipKernelCheck *bool `json:"skip_kernel_check"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/baremetal/tasks/baseprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ func (task *sBaremetalPrepareTask) sendIsolatedDevicesInfo(
}

for i := 0; i < len(gpuDevs); i++ {
if _, err := isolated_device.SyncDeviceInfo(session, task.baremetal.GetId(), gpuDevs[i]); err != nil {
if _, err := isolated_device.SyncDeviceInfo(session, task.baremetal.GetId(), gpuDevs[i], true); err != nil {
return errors.Wrap(err, "sync device info")
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/compute/models/guest_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,14 +1558,15 @@ func (self *SGuest) StartGueststartTask(
data *jsonutils.JSONDict, parentTaskId string,
) error {
schedStart := self.Hypervisor == api.HYPERVISOR_KVM && self.guestDisksStorageTypeIsShared()
startFromCreate := jsonutils.QueryBoolean(data, "start_from_create", false)
if options.Options.IgnoreNonrunningGuests {
host := HostManager.FetchHostById(self.HostId)
if host != nil && host.EnableNumaAllocate {
if !startFromCreate && host != nil && host.EnableNumaAllocate {
schedStart = true
}
}

if self.CpuNumaPin != nil {
if !startFromCreate && self.CpuNumaPin != nil {
// clean cpu numa pin
err := self.SetCpuNumaPin(ctx, userCred, nil, nil)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/compute/models/guest_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ func fetchGuestIsolatedDevices(guestIds []string) map[string][]api.SIsolatedDevi
dev.GuestId = devs[i].GuestId
dev.Addr = devs[i].Addr
dev.VendorDeviceId = devs[i].VendorDeviceId
dev.NumaNode = byte(devs[i].NumaNode)
gdevs, ok := ret[devs[i].GuestId]
if !ok {
gdevs = make([]api.SIsolatedDevice, 0)
Expand Down
21 changes: 16 additions & 5 deletions pkg/compute/models/guests.go
Original file line number Diff line number Diff line change
Expand Up @@ -4322,7 +4322,7 @@ func (self *SGuest) allocSriovNicDevice(
}
netConfig.SriovDevice.NetworkIndex = &gn.Index
netConfig.SriovDevice.WireId = net.WireId
err = self.createIsolatedDeviceOnHost(ctx, userCred, host, netConfig.SriovDevice, pendingUsageZone, nil)
err = self.createIsolatedDeviceOnHost(ctx, userCred, host, netConfig.SriovDevice, pendingUsageZone, nil, nil)
if err != nil {
return errors.Wrap(err, "self.createIsolatedDeviceOnHost")
}
Expand Down Expand Up @@ -4511,7 +4511,7 @@ func (self *SGuest) attachNVMEDevice(
) error {
gd := self.GetGuestDisk(disk.Id)
diskConfig.NVMEDevice.DiskIndex = &gd.Index
err := self.createIsolatedDeviceOnHost(ctx, userCred, host, diskConfig.NVMEDevice, pendingUsage, nil)
err := self.createIsolatedDeviceOnHost(ctx, userCred, host, diskConfig.NVMEDevice, pendingUsage, nil, nil)
if err != nil {
return errors.Wrap(err, "self.createIsolatedDeviceOnHost")
}
Expand Down Expand Up @@ -4664,24 +4664,35 @@ func (self *SGuest) createDiskOnHost(
}

func (self *SGuest) CreateIsolatedDeviceOnHost(ctx context.Context, userCred mcclient.TokenCredential, host *SHost, devs []*api.IsolatedDeviceConfig, pendingUsage quotas.IQuota) error {
var numaNodes []int
if self.CpuNumaPin != nil {
numaNodes = make([]int, 0)
cpuNumaPin := make([]schedapi.SCpuNumaPin, 0)
self.CpuNumaPin.Unmarshal(&cpuNumaPin)

for i := range cpuNumaPin {
numaNodes = append(numaNodes, cpuNumaPin[i].NodeId)
}
}

usedDeviceMap := map[string]*SIsolatedDevice{}
for _, devConfig := range devs {
if devConfig.DevType == api.NIC_TYPE || devConfig.DevType == api.NVME_PT_TYPE {
continue
}
err := self.createIsolatedDeviceOnHost(ctx, userCred, host, devConfig, pendingUsage, usedDeviceMap)
err := self.createIsolatedDeviceOnHost(ctx, userCred, host, devConfig, pendingUsage, usedDeviceMap, numaNodes)
if err != nil {
return err
}
}
return nil
}

func (self *SGuest) createIsolatedDeviceOnHost(ctx context.Context, userCred mcclient.TokenCredential, host *SHost, devConfig *api.IsolatedDeviceConfig, pendingUsage quotas.IQuota, usedDevMap map[string]*SIsolatedDevice) error {
func (self *SGuest) createIsolatedDeviceOnHost(ctx context.Context, userCred mcclient.TokenCredential, host *SHost, devConfig *api.IsolatedDeviceConfig, pendingUsage quotas.IQuota, usedDevMap map[string]*SIsolatedDevice, preferNumaNodes []int) error {
lockman.LockClass(ctx, QuotaManager, self.ProjectId)
defer lockman.ReleaseClass(ctx, QuotaManager, self.ProjectId)

err := IsolatedDeviceManager.attachHostDeviceToGuestByDesc(ctx, self, host, devConfig, userCred, usedDevMap)
err := IsolatedDeviceManager.attachHostDeviceToGuestByDesc(ctx, self, host, devConfig, userCred, usedDevMap, preferNumaNodes)
if err != nil {
return err
}
Expand Down
91 changes: 69 additions & 22 deletions pkg/compute/models/isolated_devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
"math"
"reflect"
"sort"
"strings"
Expand All @@ -33,6 +34,7 @@ import (
"yunion.io/x/sqlchemy"

api "yunion.io/x/onecloud/pkg/apis/compute"
hostapi "yunion.io/x/onecloud/pkg/apis/host"
"yunion.io/x/onecloud/pkg/apis/notify"
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
Expand Down Expand Up @@ -573,13 +575,16 @@ func (manager *SIsolatedDeviceManager) _isValidDeviceInfo(config *api.IsolatedDe
return nil
}

func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByDesc(ctx context.Context, guest *SGuest, host *SHost, devConfig *api.IsolatedDeviceConfig, userCred mcclient.TokenCredential, usedDevMap map[string]*SIsolatedDevice) error {
func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByDesc(
ctx context.Context, guest *SGuest, host *SHost, devConfig *api.IsolatedDeviceConfig,
userCred mcclient.TokenCredential, usedDevMap map[string]*SIsolatedDevice, preferNumaNodes []int,
) error {
if len(devConfig.Id) > 0 {
return manager.attachSpecificDeviceToGuest(ctx, guest, devConfig, userCred)
} else if len(devConfig.DevicePath) > 0 {
return manager.attachHostDeviceToGuestByDevicePath(ctx, guest, host, devConfig, userCred, usedDevMap)
return manager.attachHostDeviceToGuestByDevicePath(ctx, guest, host, devConfig, userCred, usedDevMap, preferNumaNodes)
} else {
return manager.attachHostDeviceToGuestByModel(ctx, guest, host, devConfig, userCred, usedDevMap)
return manager.attachHostDeviceToGuestByModel(ctx, guest, host, devConfig, userCred, usedDevMap, preferNumaNodes)
}
}

Expand All @@ -595,7 +600,7 @@ func (manager *SIsolatedDeviceManager) attachSpecificDeviceToGuest(ctx context.C
return guest.attachIsolatedDevice(ctx, userCred, dev, devConfig.NetworkIndex, devConfig.DiskIndex)
}

func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByDevicePath(ctx context.Context, guest *SGuest, host *SHost, devConfig *api.IsolatedDeviceConfig, userCred mcclient.TokenCredential, usedDevMap map[string]*SIsolatedDevice) error {
func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByDevicePath(ctx context.Context, guest *SGuest, host *SHost, devConfig *api.IsolatedDeviceConfig, userCred mcclient.TokenCredential, usedDevMap map[string]*SIsolatedDevice, preferNumaNodes []int) error {
if len(devConfig.Model) == 0 || len(devConfig.DevicePath) == 0 {
return fmt.Errorf("Model or DevicePath is empty: %#v", devConfig)
}
Expand Down Expand Up @@ -647,7 +652,10 @@ func (pq *SorttedGroupDevs) Pop() interface{} {
return item
}

func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByModel(ctx context.Context, guest *SGuest, host *SHost, devConfig *api.IsolatedDeviceConfig, userCred mcclient.TokenCredential, usedDevMap map[string]*SIsolatedDevice) error {
func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByModel(
ctx context.Context, guest *SGuest, host *SHost, devConfig *api.IsolatedDeviceConfig,
userCred mcclient.TokenCredential, usedDevMap map[string]*SIsolatedDevice, preferNumaNodes []int,
) error {
if len(devConfig.Model) == 0 {
return fmt.Errorf("Not found model from info: %#v", devConfig)
}
Expand Down Expand Up @@ -680,27 +688,66 @@ func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByModel(ctx contex
}
sort.Sort(groupDevs)

var preferNumaNode int8 = -1
for _, dev := range usedDevMap {
if dev.NumaNode >= 0 {
preferNumaNode = dev.NumaNode
break
var selectedDev *SIsolatedDevice
if len(preferNumaNodes) > 0 {
topoObj, err := host.SysInfo.Get("topology")
if err != nil {
return errors.Wrap(err, "get topology from host sys_info")
}
hostTopo := new(hostapi.HostTopology)
if err := topoObj.Unmarshal(hostTopo); err != nil {
return errors.Wrap(err, "Unmarshal host topology struct")
}
}

var selectedDev *SIsolatedDevice
if preferNumaNode >= 0 {
for i := range groupDevs {
if groupDevs[i].DevPath == "" {
for j := range groupDevs[i].Devs {
if groupDevs[i].Devs[j].NumaNode == preferNumaNode {
selectedDev = &groupDevs[i].Devs[j]
break
if len(groupDevs) == 1 && groupDevs[0].DevPath == "" {
minDistancesDevIdx := -1
minDistances := math.MaxInt32
for i := range groupDevs[0].Devs {
if groupDevs[0].Devs[i].NumaNode < 0 {
continue
}
devNodeId := groupDevs[0].Devs[i].NumaNode
for j := range hostTopo.Nodes {
if hostTopo.Nodes[j].ID == int(devNodeId) {
devDistance := 0
for k := range preferNumaNodes {
devDistance += hostTopo.Nodes[j].Distances[preferNumaNodes[k]]
}
if devDistance < minDistances {
minDistances = devDistance
minDistancesDevIdx = i
}
}
}
}
if minDistancesDevIdx >= 0 {
selectedDev = &groupDevs[0].Devs[minDistancesDevIdx]
}
} else {
minDistancesGroupIdx := -1
minDistances := math.MaxInt32
log.Infof("devtype %s grouplength %d", groupDevs[0].Devs[0].DevType, len(groupDevs))

for i := range groupDevs {
if groupDevs[i].Devs[0].NumaNode < 0 {
continue
}
devNodeId := groupDevs[i].Devs[0].NumaNode
for j := range hostTopo.Nodes {
if hostTopo.Nodes[j].ID == int(devNodeId) {
devDistance := 0
for k := range preferNumaNodes {
devDistance += hostTopo.Nodes[j].Distances[preferNumaNodes[k]]
}
if devDistance < minDistances {
minDistances = devDistance
minDistancesGroupIdx = i
}
}
}
} else if groupDevs[i].Devs[0].NumaNode == preferNumaNode {
selectedDev = &groupDevs[i].Devs[0]
break
}
if minDistancesGroupIdx >= 0 {
selectedDev = &groupDevs[minDistancesGroupIdx].Devs[0]
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/compute/tasks/guest_create_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ func (self *GuestCreateTask) OnDeployEipComplete(ctx context.Context, obj db.ISt

if jsonutils.QueryBoolean(self.GetParams(), "auto_start", false) {
self.SetStage("OnAutoStartGuest", nil)
guest.StartGueststartTask(ctx, self.GetUserCred(), nil, self.GetTaskId())
params := jsonutils.NewDict()
params.Set("start_from_create", jsonutils.JSONTrue)
guest.StartGueststartTask(ctx, self.GetUserCred(), params, self.GetTaskId())
} else {
self.SetStage("OnSyncStatusComplete", nil)
guest.StartSyncstatus(ctx, self.GetUserCred(), self.GetTaskId())
Expand Down
14 changes: 13 additions & 1 deletion pkg/compute/tasks/guest_live_migrate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
"yunion.io/x/onecloud/pkg/compute/models"
"yunion.io/x/onecloud/pkg/util/cgrouputils/cpuset"
"yunion.io/x/onecloud/pkg/util/logclient"
)

Expand Down Expand Up @@ -87,7 +88,18 @@ func (task *GuestMigrateTask) GetSchedParams() (*schedapi.ScheduleInput, error)
input.SkipCpuCheck = skipCpuCheck
input.SkipKernelCheck = skipKernelCheck
}
return guest.GetSchedMigrateParams(task.GetUserCred(), input), nil
res := guest.GetSchedMigrateParams(task.GetUserCred(), input)

if devs, _ := guest.GetIsolatedDevices(); len(devs) > 0 {
preferNumaNodesSet := cpuset.NewBuilder()
for i := range devs {
if devs[i].NumaNode >= 0 {
preferNumaNodesSet.Add(int(devs[i].NumaNode))
}
}
res.PreferNumaNodes = preferNumaNodesSet.Result().ToSlice()
}
return res, nil
}

func (task *GuestMigrateTask) OnStartSchedule(obj IScheduleModel) {
Expand Down
13 changes: 10 additions & 3 deletions pkg/hostman/guestman/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -1451,15 +1451,22 @@ func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclie
}

var cpuSetCpus string
var cpuSetMems string
{
cpuSets := sets.NewString()
cpuMemSets := sets.NewString()
if len(s.Desc.CpuNumaPin) > 0 {
for _, cpuNum := range s.GetDesc().CpuNumaPin {
for _, cpuPin := range cpuNum.VcpuPin {
for _, cpuNumaPin := range s.GetDesc().CpuNumaPin {
for _, cpuPin := range cpuNumaPin.VcpuPin {
cpuSets.Insert(fmt.Sprintf("%d", cpuPin.Pcpu))
}
if cpuNumaPin.NodeId != nil && cpuNumaPin.SizeMB > 0 {
cpuMemSets.Insert(fmt.Sprintf("%d", int(*cpuNumaPin.NodeId)))
}
}

cpuSetCpus = strings.Join(cpuSets.List(), ",")
cpuSetMems = strings.Join(cpuMemSets.List(), ",")
} else if len(s.Desc.VcpuPin) > 0 {
for _, vcpuPin := range s.Desc.VcpuPin {
cpuSets.Insert(vcpuPin.Pcpus)
Expand Down Expand Up @@ -1494,7 +1501,7 @@ func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclie
MemoryLimitInBytes: s.GetDesc().Mem * 1024 * 1024,
OomScoreAdj: 0,
CpusetCpus: cpuSetCpus,
CpusetMems: "",
CpusetMems: cpuSetMems,
HugepageLimits: nil,
Unified: nil,
MemorySwapLimitInBytes: 0,
Expand Down
3 changes: 2 additions & 1 deletion pkg/hostman/hosthandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,6 @@ func hostRestart(ctx context.Context, hostId string, body jsonutils.JSONObject)
}

func hostProbeIsolatedDevices(ctx context.Context, hostId string, body jsonutils.JSONObject) (interface{}, error) {
return hostinfo.Instance().ProbeSyncIsolatedDevices(hostId, body)
_, err := hostinfo.Instance().ProbeSyncIsolatedDevices(hostId, body)
return nil, err
}
22 changes: 17 additions & 5 deletions pkg/hostman/hostinfo/hostinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,8 @@ func (h *SHostInfo) probeSyncIsolatedDevices() (*jsonutils.JSONArray, error) {
return nil, errors.Wrap(err, "getRemoteIsolatedDevices")
}

// devs need update
var devsNeedUpdate = map[string]bool{}
for _, obj := range objs {
info := isolated_device.CloudDeviceInfo{}
if err := obj.Unmarshal(&info); err != nil {
Expand All @@ -2236,31 +2238,41 @@ func (h *SHostInfo) probeSyncIsolatedDevices() (*jsonutils.JSONArray, error) {
dev := h.IsolatedDeviceMan.GetDeviceByIdent(info.VendorDeviceId, info.Addr, info.MdevId)
if dev != nil {
dev.SetDeviceInfo(info)
devsNeedUpdate[dev.GetCloudId()] = h.IsolatedDeviceMan.CheckDevIsNeedUpdate(dev, &info)
} else {
// detach device
h.IsolatedDeviceMan.AppendDetachedDevice(&info)
}

}

h.IsolatedDeviceMan.StartDetachTask()
h.IsolatedDeviceMan.BatchCustomProbe()

// sync each isolated device found
eg := errgroup.Group{}
// limits the number of active goroutines in this group to at most
eg.SetLimit(16)
mtx := sync.Mutex{}
updateDevs := jsonutils.NewArray()

devs := h.IsolatedDeviceMan.GetDevices()
for i := range devs {
dev := devs[i]
eg.Go(func() error {
if obj, err := isolated_device.SyncDeviceInfo(h.GetSession(), h.HostId, dev); err != nil {
needUpdate := false
if need, ok := devsNeedUpdate[dev.GetCloudId()]; !ok || need {
needUpdate = true
}

if obj, err := isolated_device.SyncDeviceInfo(h.GetSession(), h.HostId, dev, needUpdate); err != nil {
log.Errorf("Sync deviceInfo %s error: %v", dev.String(), err)
return errors.Wrapf(err, "Sync device %s", dev.String())
} else {
mtx.Lock()
updateDevs.Add(obj)
mtx.Unlock()
if obj != nil {
mtx.Lock()
updateDevs.Add(obj)
mtx.Unlock()
}
return nil
}
})
Expand Down
Loading

0 comments on commit dc88d51

Please sign in to comment.