Skip to content

Commit

Permalink
feat(host): start and stop container in worker (#21038)
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi authored Aug 15, 2024
1 parent addbcbe commit 52f13e3
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 18 deletions.
7 changes: 2 additions & 5 deletions pkg/hostman/container/status/status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ package status
import (
"context"

"yunion.io/x/jsonutils"
"yunion.io/x/log"

"yunion.io/x/onecloud/pkg/apis"
computeapi "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/hostman/container/prober/results"
"yunion.io/x/onecloud/pkg/hostman/hostutils"
computemodules "yunion.io/x/onecloud/pkg/mcclient/modules/compute"
)

type Manager interface {
Expand All @@ -40,16 +38,15 @@ func NewManager() Manager {
}

func (m *manager) SetContainerStartup(podId string, containerId string, started bool, result results.ProbeResult) {
s := hostutils.GetComputeSession(context.Background())
status := computeapi.CONTAINER_STATUS_PROBE_FAILED
if started {
status = computeapi.CONTAINER_STATUS_RUNNING
}
input := apis.PerformStatusInput{
input := &apis.PerformStatusInput{
Status: status,
Reason: result.Reason,
}
if _, err := computemodules.Containers.PerformAction(s, containerId, "status", jsonutils.Marshal(input)); err != nil {
if _, err := hostutils.UpdateContainerStatus(context.Background(), containerId, input); err != nil {
log.Errorf("set container(%s/%s) status failed: %s", podId, containerId, err)
}
}
58 changes: 56 additions & 2 deletions pkg/hostman/guestman/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ func (s *sPodGuestInstance) ImportServer(pendingDelete bool) {
// TODO: 参考SKVMGuestInstance,可以做更多的事,比如同步状态
s.manager.SaveServer(s.Id, s)
s.manager.RemoveCandidateServer(s)
/*if s.IsDaemon() {
ctx := context.Background()
if !s.IsRunning() {
if err := s.StartPod(ctx, hostutils.GetComputeSession(ctx).GetToken()); err != nil {
log.Errorf("start pod (%s/%s) error: %v", s.GetId(), s.GetName(), err)
}
// TODO: start related containers and sync status
}
} else {
s.SyncStatus("sync status after host started")
s.getProbeManager().AddPod(s.Desc)
}*/
s.SyncStatus("sync status after host started")
s.getProbeManager().AddPod(s.Desc)
}
Expand Down Expand Up @@ -463,11 +475,53 @@ func (s *sPodGuestInstance) getCgroupParent() string {
return "/cloudpods"
}

func Int64Ptr(i int64) *int64 {
return &i
type podStartTask struct {
ctx context.Context
userCred mcclient.TokenCredential
pod *sPodGuestInstance
}

func newPodStartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *podStartTask {
return &podStartTask{
ctx: ctx,
userCred: userCred,
pod: pod,
}
}

func (t *podStartTask) Run() {
if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
log.Errorf("start pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
}
t.pod.SyncStatus("sync status after pod start")
}

func (t *podStartTask) Dump() string {
return fmt.Sprintf("pod start task %s/%s", t.pod.GetId(), t.pod.GetName())
}

func (s *sPodGuestInstance) StartPod(ctx context.Context, userCred mcclient.TokenCredential) error {
s.manager.GuestStartWorker.Run(newPodStartTask(ctx, userCred, s), nil, nil)
return nil
}

func (s *sPodGuestInstance) startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) {
retries := 3
sec := 5 * time.Second
var err error
var resp *computeapi.PodStartResponse
for i := 1; i <= retries; i++ {
resp, err = s._startPod(ctx, userCred)
if err == nil {
return resp, nil
}
log.Errorf("start pod %s/%s error with %d times: %v", s.GetId(), s.GetName(), i, err)
time.Sleep(sec * time.Duration(i))
}
return resp, err
}

func (s *sPodGuestInstance) _startPod(ctx context.Context, userCred mcclient.TokenCredential) (*computeapi.PodStartResponse, error) {
podInput, err := s.getPodCreateParams()
if err != nil {
return nil, errors.Wrap(err, "getPodCreateParams")
Expand Down
37 changes: 30 additions & 7 deletions pkg/hostman/guestman/podhandlers/podhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,18 @@ type containerDelayActionParams struct {
}

func containerSyncActionHandler(cf containerActionFunc) appsrv.FilterHandler {
return _containerActionHandler(cf, true)
return _containerActionHandler(cf, true, nil)
}

func containerActionHandler(cf containerActionFunc) appsrv.FilterHandler {
return _containerActionHandler(cf, false)
return _containerActionHandler(cf, false, nil)
}

func _containerActionHandler(cf containerActionFunc, isSync bool) appsrv.FilterHandler {
func containerActionHandlerWithWorker(cf containerActionFunc, workerMan *appsrv.SWorkerManager) appsrv.FilterHandler {
return _containerActionHandler(cf, false, workerMan)
}

func _containerActionHandler(cf containerActionFunc, isSync bool, workerMan *appsrv.SWorkerManager) appsrv.FilterHandler {
return auth.Authenticate(func(ctx context.Context, w http.ResponseWriter, r *http.Request) {
params, _, body := appsrv.FetchEnv(ctx, w, r)
podId := params[POD_ID]
Expand Down Expand Up @@ -92,10 +96,15 @@ func _containerActionHandler(cf containerActionFunc, isSync bool) appsrv.FilterH
hostutils.Response(ctx, w, data)
return
} else {
hostutils.DelayTask(ctx, func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
delayFunc := func(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
dp := params.(*containerDelayActionParams)
return cf(ctx, userCred, dp.pod, dp.containerId, dp.body)
}, delayParams)
}
if workerMan != nil {
hostutils.DelayTaskWithWorker(ctx, delayFunc, delayParams, workerMan)
} else {
hostutils.DelayTask(ctx, delayFunc, delayParams)
}
hostutils.ResponseOk(ctx, w)
}
})
Expand All @@ -104,8 +113,6 @@ func _containerActionHandler(cf containerActionFunc, isSync bool) appsrv.FilterH
func AddPodHandlers(prefix string, app *appsrv.Application) {
ctrHandlers := map[string]containerActionFunc{
"create": createContainer,
"start": startContainer,
"stop": stopContainer,
"delete": deleteContainer,
"sync-status": syncContainerStatus,
"pull-image": pullImage,
Expand All @@ -117,6 +124,22 @@ func AddPodHandlers(prefix string, app *appsrv.Application) {
containerActionHandler(f))
}

startWorker := appsrv.NewWorkerManager("container-start-worker", 2, appsrv.DEFAULT_BACKLOG, false)
stopWorker := appsrv.NewWorkerManager("container-stop-worker", 4, appsrv.DEFAULT_BACKLOG, false)

ctrWorkerHanders := map[string]struct {
workerMan *appsrv.SWorkerManager
f containerActionFunc
}{
"start": {startWorker, startContainer},
"stop": {stopWorker, stopContainer},
}
for action, fw := range ctrWorkerHanders {
app.AddHandler("POST",
fmt.Sprintf("%s/pods/%s/containers/%s/%s", prefix, POD_ID, CONTAINER_ID, action),
containerActionHandlerWithWorker(fw.f, fw.workerMan))
}

execWorker := appsrv.NewWorkerManager("container-exec-worker", 16, appsrv.DEFAULT_BACKLOG, false)
app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec-sync", prefix, POD_ID, CONTAINER_ID), execWorker, containerSyncActionHandler(containerExecSync)))
app.AddHandler3(newContainerWorkerHandler("POST", fmt.Sprintf("%s/pods/%s/containers/%s/exec", prefix, POD_ID, CONTAINER_ID), execWorker, execContainer()))
Expand Down
4 changes: 0 additions & 4 deletions pkg/hostman/guestman/qemu-kvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,6 @@ func (s *SKVMGuestInstance) IsDirtyShotdown() bool {
return s.GetPid() == -2
}

func (s *SKVMGuestInstance) IsDaemon() bool {
return s.Desc.IsDaemon
}

func (s *SKVMGuestInstance) DirtyServerRequestStart() {
var body = jsonutils.NewDict()
body.Set("guest_id", jsonutils.NewString(s.Id))
Expand Down
4 changes: 4 additions & 0 deletions pkg/hostman/guestman/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (s *sBaseGuestInstance) IsLoaded() bool {
return s.Desc != nil
}

func (s *sBaseGuestInstance) IsDaemon() bool {
return s.Desc.IsDaemon
}

func (s *sBaseGuestInstance) GetNicDescMatch(mac, ip, port, bridge string) *desc.SGuestNetwork {
nics := s.Desc.Nics
for _, nic := range nics {
Expand Down

0 comments on commit 52f13e3

Please sign in to comment.