Skip to content

Commit

Permalink
feat(host): start dirty shutdown pod and container (#21045)
Browse files Browse the repository at this point in the history
* feat(host): start dirty shutdown pod and container

* feat(host): control dirty guests recovery and auto started feature
  • Loading branch information
zexi authored Aug 23, 2024
1 parent 7072126 commit 4d82d59
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 21 deletions.
50 changes: 49 additions & 1 deletion pkg/hostman/guestman/guestman.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ type SGuestManager struct {
pythonPath string

// container related members
containerProbeManager prober.Manager
containerProbeManager prober.Manager
enableDirtyRecoveryFeature bool
}

func NewGuestManager(host hostutils.IHost, serversPath string) (*SGuestManager, error) {
Expand Down Expand Up @@ -281,12 +282,30 @@ func (m *SGuestManager) Bootstrap() (chan struct{}, error) {
} else {
m.isLoaded = true
log.Infof("Loading existing guests ...")
if m.needDirtyRecovery() {
if err := m.createDisableDirtyRecoveryFile(); err != nil {
log.Errorf("create disable dirty recovery file: %s", err)
} else {
log.Infof("[%s created] enable dirty recovery feature", m.disableDirtyRecoveryFilePath())
m.enableDirtyRecoveryFeature = true
}
} else {
log.Infof("[%s existed] disable dirty recovery feature", m.disableDirtyRecoveryFilePath())
m.enableDirtyRecoveryFeature = false
}
if len(m.CandidateServers) > 0 {
m.VerifyExistingGuests(false)
} else {
m.OnLoadExistingGuestsComplete()
}
}
timeutils2.AddTimeout(time.Second*time.Duration(options.HostOptions.EnableDirtyRecoverySeconds), func() {
if err := m.removeDisableDirtyRecoveryFile(); err != nil {
log.Errorf("remove disable dirty recovery file %s: %s", m.disableDirtyRecoveryFilePath(), err)
} else {
log.Infof("[%s removed] enable dirty recovery feature at next bootstrap", m.disableDirtyRecoveryFilePath())
}
})
return m.dirtyServersChan, nil
}

Expand Down Expand Up @@ -319,6 +338,35 @@ func (m *SGuestManager) OnVerifyExistingGuestsFail(err error, pendingDelete bool
timeutils2.AddTimeout(30*time.Second, func() { m.VerifyExistingGuests(false) })
}

func (m *SGuestManager) disableDirtyRecoveryFilePath() string {
return path.Join(options.HostOptions.ServersPath, "disable-guests-dirty-recovery")
}

func (m *SGuestManager) removeDisableDirtyRecoveryFile() error {
if !fileutils2.Exists(m.disableDirtyRecoveryFilePath()) {
return nil
}
return os.RemoveAll(m.disableDirtyRecoveryFilePath())
}

func (m *SGuestManager) createDisableDirtyRecoveryFile() error {
if fileutils2.Exists(m.disableDirtyRecoveryFilePath()) {
return nil
}
return fileutils2.FilePutContents(m.disableDirtyRecoveryFilePath(), "", false)
}

func (m *SGuestManager) needDirtyRecovery() bool {
if fileutils2.Exists(m.disableDirtyRecoveryFilePath()) {
return false
}
return true
}

func (m *SGuestManager) EnableDirtyRecoveryFeature() bool {
return m.enableDirtyRecoveryFeature
}

func (m *SGuestManager) OnVerifyExistingGuestsSucc(servers []jsonutils.JSONObject, pendingDelete bool) {
for _, v := range servers {
id, _ := v.GetString("id")
Expand Down
183 changes: 163 additions & 20 deletions pkg/hostman/guestman/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,83 @@ func newContainer(id string) *sContainer {
}
}

type startStatHelper struct {
podId string
homeDir string
}

func newStartStatHelper(podId string, homeDir string) *startStatHelper {
return &startStatHelper{
podId: podId,
homeDir: homeDir,
}
}

func (h startStatHelper) getPodFile() string {
return filepath.Join(h.homeDir, "pod-start.stat")
}

func (h startStatHelper) IsPodFileExists() bool {
return fileutils2.Exists(h.getPodFile())
}

func (h startStatHelper) createStatFile(fp string) error {
if fileutils2.Exists(fp) {
return nil
}
if err := pod.EnsureFile(fp, "", "755"); err != nil {
return errors.Wrapf(err, "ensure file %s", fp)
}
return nil
}

func (h startStatHelper) removeStatFile(fp string) error {
if !fileutils2.Exists(fp) {
return nil
}
if err := os.Remove(fp); err != nil {
return errors.Wrapf(err, "remove file %s", fp)
}
return nil
}

func (h startStatHelper) CreatePodFile() error {
return h.createStatFile(h.getPodFile())
}

func (h startStatHelper) RemovePodFile() error {
return h.removeStatFile(h.getPodFile())
}

func (h startStatHelper) getContainerFile(ctrId string) string {
return filepath.Join(h.homeDir, fmt.Sprintf("container-start-%s.stat", ctrId))
}

func (h startStatHelper) IsContainerFileExists(ctrId string) bool {
return fileutils2.Exists(h.getContainerFile(ctrId))
}

func (h startStatHelper) CreateContainerFile(ctrId string) error {
return h.createStatFile(h.getContainerFile(ctrId))
}

func (h startStatHelper) RemoveContainerFile(ctrId string) error {
return h.removeStatFile(h.getContainerFile(ctrId))
}

type sPodGuestInstance struct {
*sBaseGuestInstance
containers map[string]*sContainer
startStat *startStatHelper
}

func newPodGuestInstance(id string, man *SGuestManager) PodInstance {
return &sPodGuestInstance{
p := &sPodGuestInstance{
sBaseGuestInstance: newBaseGuestInstance(id, man, computeapi.HYPERVISOR_POD),
containers: make(map[string]*sContainer),
}
p.startStat = newStartStatHelper(id, p.HomeDir())
return p
}

func (s *sPodGuestInstance) CleanGuest(ctx context.Context, params interface{}) (jsonutils.JSONObject, error) {
Expand All @@ -166,20 +233,45 @@ func (s *sPodGuestInstance) ImportServer(pendingDelete bool) {
// TODO: 参考SKVMGuestInstance,可以做更多的事,比如同步状态
s.manager.SaveServer(s.Id, s)
s.manager.RemoveCandidateServer(s)
/*if s.IsDaemon() {
if s.IsDaemon() || s.IsDirtyShutdown() {
ctx := context.Background()
if !s.IsRunning() {
if err := s.StartPod(ctx, hostutils.GetComputeSession(ctx).GetToken()); err != nil {
log.Errorf("start pod (%s/%s) error: %v", s.GetId(), s.GetName(), err)
}
// TODO: start related containers and sync status
cred := hostutils.GetComputeSession(ctx).GetToken()
if err := s.StartLocalPod(ctx, cred); err != nil {
log.Errorf("start local pod err %s", err.Error())
}
} else {
s.SyncStatus("sync status after host started")
s.getProbeManager().AddPod(s.Desc)
}*/
s.SyncStatus("sync status after host started")
s.getProbeManager().AddPod(s.Desc)
}
}

func (s *sPodGuestInstance) isPodDirtyShutdown() bool {
if !s.IsRunning() && s.startStat.IsPodFileExists() {
return true
}
return false
}

func (s *sPodGuestInstance) isContainerDirtyShutdown(ctrId string) bool {
if !s.IsContainerRunning(context.Background(), ctrId) && s.startStat.IsContainerFileExists(ctrId) {
return true
}
return false
}

func (s *sPodGuestInstance) IsDirtyShutdown() bool {
if !s.manager.EnableDirtyRecoveryFeature() {
return false
}
if s.isPodDirtyShutdown() {
return true
}
for _, ctr := range s.GetContainers() {
if s.isContainerDirtyShutdown(ctr.Id) {
return true
}
}
return false
}

func (s *sPodGuestInstance) SyncStatus(reason string) {
Expand Down Expand Up @@ -286,6 +378,17 @@ func (s *sPodGuestInstance) IsRunning() bool {
return false
}

func (s *sPodGuestInstance) IsContainerRunning(ctx context.Context, ctrId string) bool {
status, err := s.getContainerStatus(ctx, ctrId)
if err != nil {
return false
}
if sets.NewString(computeapi.CONTAINER_STATUS_RUNNING, computeapi.CONTAINER_STATUS_PROBING).Has(status) {
return true
}
return false
}

func (s *sPodGuestInstance) HandleGuestStatus(ctx context.Context, status string, body *jsonutils.JSONDict) (jsonutils.JSONObject, error) {
body.Set("status", jsonutils.NewString(status))
hostutils.TaskComplete(ctx, body)
Expand Down Expand Up @@ -475,33 +578,44 @@ func (s *sPodGuestInstance) getCgroupParent() string {
return "/cloudpods"
}

type podStartTask struct {
type localPodStartTask struct {
ctx context.Context
userCred mcclient.TokenCredential
pod *sPodGuestInstance
}

func newPodStartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *podStartTask {
return &podStartTask{
func newLocalPodStartTask(ctx context.Context, userCred mcclient.TokenCredential, pod *sPodGuestInstance) *localPodStartTask {
return &localPodStartTask{
ctx: ctx,
userCred: userCred,
pod: pod,
}
}

func (t *podStartTask) Run() {
if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
log.Errorf("start pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
func (t *localPodStartTask) Run() {
if t.pod.isPodDirtyShutdown() {
log.Infof("start pod locally (%s/%s)", t.pod.Id, t.pod.GetName())
if _, err := t.pod.startPod(t.ctx, t.userCred); err != nil {
log.Errorf("start pod(%s/%s) err: %s", t.pod.GetId(), t.pod.GetName(), err.Error())
}
}
t.pod.SyncStatus("sync status after pod start")
for _, ctr := range t.pod.GetContainers() {
if t.pod.isContainerDirtyShutdown(ctr.Id) {
log.Infof("start container locally (%s/%s/%s/%s)", t.pod.Id, t.pod.GetName(), ctr.Id, ctr.Name)
if _, err := t.pod.StartLocalContainer(t.ctx, t.userCred, ctr.Id); err != nil {
log.Errorf("start container %s err: %s", ctr.Id, err.Error())
}
}
}
t.pod.SyncStatus("sync status after pod start locally")
}

func (t *podStartTask) Dump() string {
func (t *localPodStartTask) Dump() string {
return fmt.Sprintf("pod start task %s/%s", t.pod.GetId(), t.pod.GetName())
}

func (s *sPodGuestInstance) StartPod(ctx context.Context, userCred mcclient.TokenCredential) error {
s.manager.GuestStartWorker.Run(newPodStartTask(ctx, userCred, s), nil, nil)
func (s *sPodGuestInstance) StartLocalPod(ctx context.Context, userCred mcclient.TokenCredential) error {
s.manager.GuestStartWorker.Run(newLocalPodStartTask(ctx, userCred, s), nil, nil)
return nil
}

Expand Down Expand Up @@ -624,6 +738,9 @@ func (s *sPodGuestInstance) _startPod(ctx context.Context, userCred mcclient.Tok
}

s.getProbeManager().AddPod(s.Desc)
if err := s.startStat.CreatePodFile(); err != nil {
return nil, errors.Wrap(err, "startStat.CreatePodFile")
}
return &computeapi.PodStartResponse{
CRIId: criId,
IsRunning: false,
Expand Down Expand Up @@ -666,6 +783,9 @@ func (s *sPodGuestInstance) ensurePodRemoved(ctx context.Context, timeout int64)
}

s.getProbeManager().RemovePod(s.Desc)
if err := s.startStat.RemovePodFile(); err != nil {
return errors.Wrap(err, "startStat.RemovePodFile")
}
return nil
}

Expand Down Expand Up @@ -740,6 +860,23 @@ func (s *sPodGuestInstance) getContainerCRIId(ctrId string) (string, error) {
return ctr.CRIId, nil
}

func (s *sPodGuestInstance) StartLocalContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) {
ctr := s.GetContainerById(ctrId)
if ctr == nil {
return nil, errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId)
}
input := &hostapi.ContainerCreateInput{
Name: ctr.Name,
GuestId: s.GetId(),
Spec: ctr.Spec,
}
ret, err := s.StartContainer(ctx, userCred, ctrId, input)
if err != nil {
return nil, errors.Wrap(err, "start container")
}
return ret, nil
}

func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) {
_, hasCtr := s.containers[ctrId]
needRecreate := false
Expand Down Expand Up @@ -790,6 +927,9 @@ func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclien
if err := s.doContainerStartPostLifecycle(ctx, criId, input); err != nil {
return nil, errors.Wrap(err, "do container lifecycle")
}
if err := s.startStat.CreateContainerFile(ctrId); err != nil {
return nil, errors.Wrapf(err, "create container startup stat file %s", ctrId)
}
return nil, nil
}

Expand Down Expand Up @@ -870,6 +1010,9 @@ func (s *sPodGuestInstance) StopContainer(ctx context.Context, userCred mcclient
if err := s.getCRI().StopContainer(ctx, criId, timeout); err != nil {
return nil, errors.Wrap(err, "CRI.StopContainer")
}
if err := s.startStat.RemoveContainerFile(ctrId); err != nil {
return nil, errors.Wrap(err, "startStat.RemoveContainerFile")
}
return nil, nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/hostman/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ type SHostOptions struct {
CudaMPSReplicas int `help:"cuda mps replias" default:"10"`

EnableContainerAscendNPU bool `help:"enable container npu" default:"false"`

EnableDirtyRecoverySeconds int `help:"Seconds to delay enable dirty guests recovery feature, default 15 minutes" default:"900"`
}

var (
Expand Down

0 comments on commit 4d82d59

Please sign in to comment.