Skip to content

Commit

Permalink
Implement Clustering and App related functions
Browse files Browse the repository at this point in the history
- change VMI to VMI ReplicaSet for kubernetes
- change Pod to Pod RelicaSet for containers
- change functions handling replicaset names in services
- subscribe EdgeNodeInfo in domainmgr, zedmanager to get
  node-name for cluster
- add Designated Node ID to several structs for App
- not to delete domain from kubernetes if not a Designated
  App node
- parse config for EdgeNodeClusterConfig in zedagent
- handle ENClusterAppStatus publication in zedmanger in
  multi-node clustering case
- zedmanager handling effective-activation include ENClusterAppStatus
- kubevirt hypervisor changes to handle VMI/Pod ReplicaSets

Signed-off-by: Naiming Shen <[email protected]>
  • Loading branch information
naiming-zededa committed Dec 10, 2024
1 parent e872c81 commit 2453125
Show file tree
Hide file tree
Showing 16 changed files with 1,015 additions and 262 deletions.
28 changes: 25 additions & 3 deletions pkg/pillar/base/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,37 @@ func GetAppKubeName(displayName string, uuid uuid.UUID) string {
}

// GetVMINameFromVirtLauncher : get VMI name from the corresponding Kubevirt
// launcher pod name.
// launcher pod name for replicaset generated VMI.
func GetVMINameFromVirtLauncher(podName string) (vmiName string, isVirtLauncher bool) {
if !strings.HasPrefix(podName, VMIPodNamePrefix) {
return "", false
}
vmiName = strings.TrimPrefix(podName, VMIPodNamePrefix)
lastSep := strings.LastIndex(vmiName, "-")
if lastSep != -1 {
vmiName = vmiName[:lastSep]
if lastSep == -1 || lastSep < 5 {
return "", false
}

// Check if the last part is 5 bytes long
if len(vmiName[lastSep+1:]) != 5 {
return "", false
}

// Use the index minus 5 bytes to get the VMI name to remove added
// replicaset suffix
vmiName = vmiName[:lastSep-5]
return vmiName, true
}

// GetReplicaPodName : get the app name from the pod name for replica pods.
func GetReplicaPodName(displayName, podName string, uuid uuid.UUID) (kubeName string, isReplicaPod bool) {
kubeName = GetAppKubeName(displayName, uuid)
if !strings.HasPrefix(podName, kubeName) {
return "", false
}
suffix := strings.TrimPrefix(podName, kubeName)
if strings.HasPrefix(suffix, "-") && len(suffix[1:]) == 5 {
return kubeName, true
}
return "", false
}
1 change: 1 addition & 0 deletions pkg/pillar/cipher/cipher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func getEncryptionBlock(
decBlock.CellularNetUsername = zconfigDecBlockPtr.CellularNetUsername
decBlock.CellularNetPassword = zconfigDecBlockPtr.CellularNetPassword
decBlock.ProtectedUserData = zconfigDecBlockPtr.ProtectedUserData
decBlock.ClusterToken = zconfigDecBlockPtr.ClusterToken
return decBlock
}

Expand Down
133 changes: 123 additions & 10 deletions pkg/pillar/cmd/domainmgr/domainmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type domainContext struct {
pubDomainStatus pubsub.Publication
subGlobalConfig pubsub.Subscription
subZFSPoolStatus pubsub.Subscription
subEdgeNodeInfo pubsub.Subscription
pubAssignableAdapters pubsub.Publication
pubDomainMetric pubsub.Publication
pubHostMemory pubsub.Publication
Expand Down Expand Up @@ -126,6 +127,7 @@ type domainContext struct {
cpuPinningSupported bool
// Is it kubevirt eve
hvTypeKube bool
nodeName string
}

// AddAgentSpecificCLIFlags adds CLI options
Expand Down Expand Up @@ -414,9 +416,24 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
domainCtx.subZFSPoolStatus = subZFSPoolStatus
subZFSPoolStatus.Activate()

// Look for edge node info
subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedagent",
MyAgentName: agentName,
TopicImpl: types.EdgeNodeInfo{},
Persistent: true,
Activate: false,
})
if err != nil {
log.Fatal(err)
}
domainCtx.subEdgeNodeInfo = subEdgeNodeInfo
_ = subEdgeNodeInfo.Activate()

// Parse any existing ConfigIntemValueMap but continue if there
// is none
for !domainCtx.GCComplete {
waitEdgeNodeInfo := true
for !domainCtx.GCComplete || (domainCtx.hvTypeKube && waitEdgeNodeInfo) {
log.Noticef("waiting for GCComplete")
select {
case change := <-subGlobalConfig.MsgChan():
Expand All @@ -425,6 +442,10 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case <-domainCtx.publishTicker.C:
publishProcessesHandler(&domainCtx)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)
waitEdgeNodeInfo = false

case <-stillRunning.C:
}
ps.StillRunning(agentName, warningTime, errorTime)
Expand Down Expand Up @@ -513,6 +534,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-subZFSPoolStatus.MsgChan():
subZFSPoolStatus.ProcessChange(change)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)

case <-domainCtx.publishTicker.C:
publishProcessesHandler(&domainCtx)

Expand Down Expand Up @@ -651,6 +675,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-subPhysicalIOAdapter.MsgChan():
subPhysicalIOAdapter.ProcessChange(change)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)

case <-domainCtx.publishTicker.C:
start := time.Now()
err = domainCtx.cipherMetrics.Publish(log, cipherMetricsPub, "global")
Expand Down Expand Up @@ -977,12 +1004,15 @@ func verifyStatus(ctx *domainContext, status *types.DomainStatus) {
status.SetErrorDescription(errDescription)
}

//cleanup app instance tasks
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
}
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil {
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err)
// in cluster mode, we can not delete the pod due to failing to get app info
if !ctx.hvTypeKube {
//cleanup app instance tasks
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
}
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil {
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err)
}
}
}
status.DomainId = 0
Expand Down Expand Up @@ -1071,6 +1101,14 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) {
if !status.BootFailed {
return
}

err := getnodeNameAndUUID(ctx)
if err != nil {
log.Errorf("maybeRetryBoot(%s) getnodeNameAndUUID failed: %s",
status.Key(), err)
return
}

if status.Activated && status.BootFailed {
log.Functionf("maybeRetryBoot(%s) clearing bootFailed since Activated",
status.Key())
Expand Down Expand Up @@ -1138,6 +1176,11 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) {
log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err)
}

// pass nodeName to hypervisor call Setup
if status.NodeName == "" {
status.NodeName = ctx.nodeName
}

if err := hyper.Task(status).Setup(*status, *config, ctx.assignableAdapters, nil, file); err != nil {
//it is retry, so omit error
log.Errorf("Failed to create DomainStatus from %+v: %s",
Expand Down Expand Up @@ -1322,6 +1365,7 @@ func handleCreate(ctx *domainContext, key string, config *types.DomainConfig) {
State: types.INSTALLED,
VmConfig: config.VmConfig,
Service: config.Service,
IsDNidNode: config.IsDNidNode,
}

status.VmConfig.CPUs = make([]int, 0)
Expand Down Expand Up @@ -1531,6 +1575,13 @@ func doActivate(ctx *domainContext, config types.DomainConfig,
log.Functionf("doActivate(%v) for %s",
config.UUIDandVersion, config.DisplayName)

err := getnodeNameAndUUID(ctx)
if err != nil {
log.Errorf("doActivate(%s) getnodeNameAndUUID failed: %s",
status.Key(), err)
return
}

if ctx.cpuPinningSupported {
if err := assignCPUs(ctx, &config, status); err != nil {
log.Warnf("failed to assign CPUs for %s", config.DisplayName)
Expand Down Expand Up @@ -1684,6 +1735,11 @@ func doActivate(ctx *domainContext, config types.DomainConfig,
log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err)
}

// pass nodeName to hypervisor call Setup
if status.NodeName == "" {
status.NodeName = ctx.nodeName
}

globalConfig := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig)
if err := hyper.Task(status).Setup(*status, config, ctx.assignableAdapters, globalConfig, file); err != nil {
log.Errorf("Failed to create DomainStatus from %+v: %s",
Expand Down Expand Up @@ -1751,6 +1807,17 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus,
log.Errorf("domain start for %s: %s", status.DomainName, err)
status.SetErrorNow(err.Error())

// HvKube case
if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName)
return
}

// Only send delete if DomainConfig is not deleted
if ctx.hvTypeKube && !status.DomainConfigDeleted {
log.Noticef("doActivateTail(%v) DomainConfig exists, skip delete app", status.DomainName)
return
}
// Delete
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
Expand Down Expand Up @@ -1780,6 +1847,17 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus,
status.SetErrorNow(err.Error())
log.Errorf("doActivateTail(%v) failed for %s: %s",
status.UUIDandVersion, status.DisplayName, err)

if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName)
return
}
// Only send delete if DomainConfig is not deleted
if ctx.hvTypeKube && !status.DomainConfigDeleted {
log.Noticef("doActivateTail(%v) DomainConfig exists, skip delete app", status.DomainName)
return
}

// Delete
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
Expand Down Expand Up @@ -1844,7 +1922,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
if doShutdown {
// If the Shutdown fails we don't wait; assume failure
// was due to no PV tools
if err := DomainShutdown(*status, false); err != nil {
if err := DomainShutdown(ctx, *status, false); err != nil {
log.Errorf("DomainShutdown %s failed: %s",
status.DomainName, err)
} else {
Expand All @@ -1864,7 +1942,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
// the domain is already on the way down.
// In case of errors we proceed directly to deleting the task,
// and after that we waitForDomainGone
if err := DomainShutdown(*status, true); err != nil {
if err := DomainShutdown(ctx, *status, true); err != nil {
log.Warnf("DomainShutdown -F %s failed: %s",
status.DomainName, err)
} else {
Expand All @@ -1881,6 +1959,16 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
}

if status.DomainId != 0 {
if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("doInactivate(%v) we are not DNiD, skip delete app", status.DomainName)
return
}
// Only send delete if DomainConfig is not deleted
if ctx.hvTypeKube && !status.DomainConfigDeleted {
log.Noticef("doInactivate(%v) DomainConfig exists, skip delete app", status.DomainName)
return
}

if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("Failed to delete domain %s (%v)", status.DomainName, err)
} else {
Expand Down Expand Up @@ -2468,6 +2556,13 @@ func handleDelete(ctx *domainContext, key string, status *types.DomainStatus) {
// No point in publishing metrics any more
ctx.pubDomainMetric.Unpublish(status.Key())

if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("handleDelete(%v) we are not DNiD, skip delete app", status.DomainName)
return
}
status.DomainConfigDeleted = true
log.Noticef("handleDelete(%v) DomainConfigDeleted", status.DomainName)

err := hyper.Task(status).Delete(status.DomainName)
if err != nil {
log.Errorln(err)
Expand Down Expand Up @@ -2508,13 +2603,18 @@ func DomainCreate(ctx *domainContext, status types.DomainStatus) (int, error) {
}

// DomainShutdown is a wrapper for domain shutdown
func DomainShutdown(status types.DomainStatus, force bool) error {
func DomainShutdown(ctx *domainContext, status types.DomainStatus, force bool) error {

var err error
log.Functionf("DomainShutdown force-%v %s %d", force, status.DomainName, status.DomainId)

// Stop the domain
log.Functionf("Stopping domain - %s", status.DomainName)

if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("DomainShutdown(%v) we are not DNiD, skip delete app", status.DomainName)
return nil
}
err = hyper.Task(&status).Stop(status.DomainName, force)

return err
Expand Down Expand Up @@ -3603,3 +3703,16 @@ func lookupCapabilities(ctx *domainContext) (*types.Capabilities, error) {
}
return &capabilities, nil
}

func getnodeNameAndUUID(ctx *domainContext) error {
if ctx.nodeName == "" {
NodeInfo, err := ctx.subEdgeNodeInfo.Get("global")
if err != nil {
log.Errorf("getnodeNameAndUUID: can't get edgeNodeInfo %v", err)
return err
}
enInfo := NodeInfo.(types.EdgeNodeInfo)
ctx.nodeName = strings.ToLower(enInfo.DeviceName)
}
return nil
}
Loading

0 comments on commit 2453125

Please sign in to comment.