From 52f13e3ad98d04c4f3af02da2832e99f03856f6c Mon Sep 17 00:00:00 2001 From: Zexi Li Date: Thu, 15 Aug 2024 20:24:11 +0800 Subject: [PATCH] feat(host): start and stop container in worker (#21038) --- .../container/status/status_manager.go | 7 +-- pkg/hostman/guestman/pod.go | 58 ++++++++++++++++++- .../guestman/podhandlers/podhandlers.go | 37 +++++++++--- pkg/hostman/guestman/qemu-kvm.go | 4 -- pkg/hostman/guestman/runtime.go | 4 ++ 5 files changed, 92 insertions(+), 18 deletions(-) diff --git a/pkg/hostman/container/status/status_manager.go b/pkg/hostman/container/status/status_manager.go index bdc06cc8caf..797796fce67 100644 --- a/pkg/hostman/container/status/status_manager.go +++ b/pkg/hostman/container/status/status_manager.go @@ -17,14 +17,12 @@ package status import ( "context" - "yunion.io/x/jsonutils" "yunion.io/x/log" "yunion.io/x/onecloud/pkg/apis" computeapi "yunion.io/x/onecloud/pkg/apis/compute" "yunion.io/x/onecloud/pkg/hostman/container/prober/results" "yunion.io/x/onecloud/pkg/hostman/hostutils" - computemodules "yunion.io/x/onecloud/pkg/mcclient/modules/compute" ) type Manager interface { @@ -40,16 +38,15 @@ func NewManager() Manager { } func (m *manager) SetContainerStartup(podId string, containerId string, started bool, result results.ProbeResult) { - s := hostutils.GetComputeSession(context.Background()) status := computeapi.CONTAINER_STATUS_PROBE_FAILED if started { status = computeapi.CONTAINER_STATUS_RUNNING } - input := apis.PerformStatusInput{ + input := &apis.PerformStatusInput{ Status: status, Reason: result.Reason, } - if _, err := computemodules.Containers.PerformAction(s, containerId, "status", jsonutils.Marshal(input)); err != nil { + if _, err := hostutils.UpdateContainerStatus(context.Background(), containerId, input); err != nil { log.Errorf("set container(%s/%s) status failed: %s", podId, containerId, err) } } diff --git a/pkg/hostman/guestman/pod.go b/pkg/hostman/guestman/pod.go index b4f9d5a89a1..24c1b9b71d1 100644 --- a/pkg/hostman/guestman/pod.go +++ b/pkg/hostman/guestman/pod.go @@ -166,6 +166,18 @@ func (s *sPodGuestInstance) ImportServer(pendingDelete bool) { // TODO: 参考SKVMGuestInstance,可以做更多的事,比如同步状态 s.manager.SaveServer(s.Id, s) s.manager.RemoveCandidateServer(s) + /*if s.IsDaemon() { + ctx := context.Background() + if !s.IsRunning() { + if err := s.StartPod(ctx, hostutils.GetComputeSession(ctx).GetToken()); err != nil { + log.Errorf("start pod (%s/%s) error: %v", s.GetId(), s.GetName(), err) + } + // TODO: start related containers and sync status + } + } else { + s.SyncStatus("sync status after host started") + s.getProbeManager().AddPod(s.Desc) + }*/ s.SyncStatus("sync status after host started") s.getProbeManager().AddPod(s.Desc) } @@ -463,11 +475,53 @@ func (s *sPodGuestInstance) getCgroupParent() string { return "/cloudpods" } -func Int64Ptr(i int64) *int64 { - return &i +type podStartTask struct { + ctx context.Context + userCred mcclient.TokenCredential + pod *sPodGuestInstance +} + +func newPodStartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *podStartTask { + return &podStartTask{ + ctx: ctx, + userCred: userCred, + pod: pod, + } +} + +func (t *podStartTask) Run() { + if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil { + log.Errorf("start pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error()) + } + t.pod.SyncStatus("sync status after pod start") +} + +func (t *podStartTask) Dump() string { + return fmt.Sprintf("pod start task %s/%s", t.pod.GetId(), t.pod.GetName()) +} + +func (s *sPodGuestInstance) StartPod(ctx context.Context, userCred mcclient.TokenCredential) error { + s.manager.GuestStartWorker.Run(newPodStartTask(ctx, userCred, s), nil, nil) + return nil } func (s *sPodGuestInstance) startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) { + retries := 3 + sec := 5 * time.Second + var err error + var resp *computeapi.PodStartResponse + for i := 1; i <= retries; i++ { + resp, err = s._startPod(ctx, userCred) + if err == nil { + return resp, nil + } + log.Errorf("start pod %s/%s error with %d times: %v", s.GetId(), s.GetName(), i, err) + time.Sleep(sec * time.Duration(i)) + } + return resp, err +} + +func (s *sPodGuestInstance) _startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) { podInput, err := s.getPodCreateParams() if err != nil { return nil, errors.Wrap(err, "getPodCreateParams") diff --git a/pkg/hostman/guestman/podhandlers/podhandlers.go b/pkg/hostman/guestman/podhandlers/podhandlers.go index 74d6195e25c..c9fdf767f65 100644 --- a/pkg/hostman/guestman/podhandlers/podhandlers.go +++ b/pkg/hostman/guestman/podhandlers/podhandlers.go @@ -52,14 +52,18 @@ type containerDelayActionParams struct { } func containerSyncActionHandler(cf containerActionFunc) appsrv.FilterHandler { - return _containerActionHandler(cf, true) + return _containerActionHandler(cf, true, nil) } func containerActionHandler(cf containerActionFunc) appsrv.FilterHandler { - return _containerActionHandler(cf, false) + return _containerActionHandler(cf, false, nil) } -func _containerActionHandler(cf containerActionFunc, isSync bool) appsrv.FilterHandler { +func containerActionHandlerWithWorker(cf containerActionFunc, workerMan *appsrv.SWorkerManager) appsrv.FilterHandler { + return _containerActionHandler(cf, false, workerMan) +} + +func _containerActionHandler(cf containerActionFunc, isSync bool, workerMan *appsrv.SWorkerManager) appsrv.FilterHandler { return auth.Authenticate(func(ctx context.Context, w http.ResponseWriter, r *http.Request) { params, _, body := appsrv.FetchEnv(ctx, w, r) podId := params[POD_ID] @@ -92,10 +96,15 @@ func _containerActionHandler(cf containerActionFunc, isSync bool) appsrv.FilterH hostutils.Response(ctx, w, data) return } else { - hostutils.DelayTask(ctx, func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) { + delayFunc := func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) { dp := params.(*containerDelayActionParams) return cf(ctx, userCred, dp.pod, dp.containerId, dp.body) - }, delayParams) + } + if workerMan != nil { + hostutils.DelayTaskWithWorker(ctx, delayFunc, delayParams, workerMan) + } else { + hostutils.DelayTask(ctx, delayFunc, delayParams) + } hostutils.ResponseOk(ctx, w) } }) @@ -104,8 +113,6 @@ func _containerActionHandler(cf containerActionFunc, isSync bool) appsrv.FilterH func AddPodHandlers(prefix string, app *appsrv.Application) { ctrHandlers := map[string]containerActionFunc{ "create": createContainer, - "start": startContainer, - "stop": stopContainer, "delete": deleteContainer, "sync-status": syncContainerStatus, "pull-image": pullImage, @@ -117,6 +124,22 @@ func AddPodHandlers(prefix string, app *appsrv.Application) { containerActionHandler(f)) } + startWorker := appsrv.NewWorkerManager("container-start-worker", 2, appsrv.DEFAULT_BACKLOG, false) + stopWorker := appsrv.NewWorkerManager("container-stop-worker", 4, appsrv.DEFAULT_BACKLOG, false) + + ctrWorkerHanders := map[string]struct { + workerMan *appsrv.SWorkerManager + f containerActionFunc + }{ + "start": {startWorker, startContainer}, + "stop": {stopWorker, stopContainer}, + } + for action, fw := range ctrWorkerHanders { + app.AddHandler("POST", + fmt.Sprintf("%s/pods/%s/containers/%s/%s", prefix, POD_ID, CONTAINER_ID, action), + containerActionHandlerWithWorker(fw.f, fw.workerMan)) + } + execWorker := appsrv.NewWorkerManager("container-exec-worker", 16, appsrv.DEFAULT_BACKLOG, false) app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec-sync", prefix, POD_ID, CONTAINER_ID), execWorker, containerSyncActionHandler(containerExecSync))) app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec", prefix, POD_ID, CONTAINER_ID), execWorker, execContainer())) diff --git a/pkg/hostman/guestman/qemu-kvm.go b/pkg/hostman/guestman/qemu-kvm.go index 62f068a3cf6..c2f46ee90df 100644 --- a/pkg/hostman/guestman/qemu-kvm.go +++ b/pkg/hostman/guestman/qemu-kvm.go @@ -644,10 +644,6 @@ func (s *SKVMGuestInstance) IsDirtyShotdown() bool { return s.GetPid() == -2 } -func (s *SKVMGuestInstance) IsDaemon() bool { - return s.Desc.IsDaemon -} - func (s *SKVMGuestInstance) DirtyServerRequestStart() { var body = jsonutils.NewDict() body.Set("guest_id", jsonutils.NewString(s.Id)) diff --git a/pkg/hostman/guestman/runtime.go b/pkg/hostman/guestman/runtime.go index 824863cf45d..7a555e140ae 100644 --- a/pkg/hostman/guestman/runtime.go +++ b/pkg/hostman/guestman/runtime.go @@ -146,6 +146,10 @@ func (s *sBaseGuestInstance) IsLoaded() bool { return s.Desc != nil } +func (s *sBaseGuestInstance) IsDaemon() bool { + return s.Desc.IsDaemon +} + func (s *sBaseGuestInstance) GetNicDescMatch(mac, ip, port, bridge string) *desc.SGuestNetwork { nics := s.Desc.Nics for _, nic := range nics {