Skip to content

Commit

Permalink
Implement Clustering and App related functions
Browse files Browse the repository at this point in the history
- change VMI to VMI ReplicaSet for kubernetes
- change Pod to Pod RelicaSet for containers
- change functions handling replicaset names in services
- subscribe EdgeNodeInfo in domainmgr, zedmanager to get
  node-name for cluster
- add Designated Node ID to several structs for App
- not to delete domain from kubernetes if not a Designated
  App node
- parse config for EdgeNodeClusterConfig in zedagent
- handle ENClusterAppStatus publication in zedmanger in
  multi-node clustering case
- zedmanager handling effective-activation include ENClusterAppStatus
- kubevirt hypervisor changes to handle VMI/Pod ReplicaSets

Signed-off-by: Naiming Shen <[email protected]>
  • Loading branch information
naiming-zededa committed Oct 30, 2024
1 parent 7534022 commit 631cc70
Show file tree
Hide file tree
Showing 14 changed files with 994 additions and 259 deletions.
28 changes: 25 additions & 3 deletions pkg/pillar/base/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,37 @@ func GetAppKubeName(displayName string, uuid uuid.UUID) string {
}

// GetVMINameFromVirtLauncher : get VMI name from the corresponding Kubevirt
// launcher pod name.
// launcher pod name for replicaset generated VMI.
func GetVMINameFromVirtLauncher(podName string) (vmiName string, isVirtLauncher bool) {
if !strings.HasPrefix(podName, VMIPodNamePrefix) {
return "", false
}
vmiName = strings.TrimPrefix(podName, VMIPodNamePrefix)
lastSep := strings.LastIndex(vmiName, "-")
if lastSep != -1 {
vmiName = vmiName[:lastSep]
if lastSep == -1 || lastSep < 5 {
return "", false
}

// Check if the last part is 5 bytes long
if len(vmiName[lastSep+1:]) != 5 {
return "", false
}

// Use the index minus 5 bytes to get the VMI name to remove added
// replicaset suffix
vmiName = vmiName[:lastSep-5]
return vmiName, true
}

// GetReplicaPodName : get the app name from the pod name for replica pods.
func GetReplicaPodName(displayName, podName string, uuid uuid.UUID) (kubeName string, isReplicaPod bool) {
kubeName = GetAppKubeName(displayName, uuid)
if !strings.HasPrefix(podName, kubeName) {
return "", false
}
suffix := strings.TrimPrefix(podName, kubeName)
if strings.HasPrefix(suffix, "-") && len(suffix[1:]) == 5 {
return kubeName, true
}
return "", false
}
1 change: 1 addition & 0 deletions pkg/pillar/cipher/cipher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func getEncryptionBlock(
decBlock.CellularNetUsername = zconfigDecBlockPtr.CellularNetUsername
decBlock.CellularNetPassword = zconfigDecBlockPtr.CellularNetPassword
decBlock.ProtectedUserData = zconfigDecBlockPtr.ProtectedUserData
decBlock.ClusterToken = zconfigDecBlockPtr.ClusterToken
return decBlock
}

Expand Down
118 changes: 108 additions & 10 deletions pkg/pillar/cmd/domainmgr/domainmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type domainContext struct {
pubDomainStatus pubsub.Publication
subGlobalConfig pubsub.Subscription
subZFSPoolStatus pubsub.Subscription
subEdgeNodeInfo pubsub.Subscription
pubAssignableAdapters pubsub.Publication
pubDomainMetric pubsub.Publication
pubHostMemory pubsub.Publication
Expand Down Expand Up @@ -126,6 +127,7 @@ type domainContext struct {
cpuPinningSupported bool
// Is it kubevirt eve
hvTypeKube bool
nodeName string
}

// AddAgentSpecificCLIFlags adds CLI options
Expand Down Expand Up @@ -414,9 +416,25 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
domainCtx.subZFSPoolStatus = subZFSPoolStatus
subZFSPoolStatus.Activate()

// Look for edge node info
subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedagent",
MyAgentName: agentName,
TopicImpl: types.EdgeNodeInfo{},
Persistent: true,
Activate: false,
})
if err != nil {
log.Fatal(err)
}
domainCtx.subEdgeNodeInfo = subEdgeNodeInfo
_ = subEdgeNodeInfo.Activate()

// Parse any existing ConfigIntemValueMap but continue if there
// is none
for !domainCtx.GCComplete {
waitEdgeNodeInfo := true
wtTime := time.Now()
for !domainCtx.GCComplete || (domainCtx.hvTypeKube && waitEdgeNodeInfo) {
log.Noticef("waiting for GCComplete")
select {
case change := <-subGlobalConfig.MsgChan():
Expand All @@ -425,9 +443,16 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case <-domainCtx.publishTicker.C:
publishProcessesHandler(&domainCtx)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)
waitEdgeNodeInfo = false

case <-stillRunning.C:
}
ps.StillRunning(agentName, warningTime, errorTime)
if time.Since(wtTime) > 5*time.Minute { // wait for max of 5 minutes
waitEdgeNodeInfo = false
}
}
log.Noticef("processed GCComplete")

Expand Down Expand Up @@ -513,6 +538,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-subZFSPoolStatus.MsgChan():
subZFSPoolStatus.ProcessChange(change)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)

case <-domainCtx.publishTicker.C:
publishProcessesHandler(&domainCtx)

Expand Down Expand Up @@ -651,6 +679,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-subPhysicalIOAdapter.MsgChan():
subPhysicalIOAdapter.ProcessChange(change)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)

case <-domainCtx.publishTicker.C:
start := time.Now()
err = domainCtx.cipherMetrics.Publish(log, cipherMetricsPub, "global")
Expand Down Expand Up @@ -977,12 +1008,15 @@ func verifyStatus(ctx *domainContext, status *types.DomainStatus) {
status.SetErrorDescription(errDescription)
}

//cleanup app instance tasks
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
}
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil {
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err)
// in cluster mode, we can not delete the pod due to failing to get app info
if !ctx.hvTypeKube {
//cleanup app instance tasks
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
}
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil {
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err)
}
}
}
status.DomainId = 0
Expand Down Expand Up @@ -1071,6 +1105,14 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) {
if !status.BootFailed {
return
}

err := getnodeNameAndUUID(ctx)
if err != nil {
log.Errorf("maybeRetryBoot(%s) getnodeNameAndUUID failed: %s",
status.Key(), err)
return
}

if status.Activated && status.BootFailed {
log.Functionf("maybeRetryBoot(%s) clearing bootFailed since Activated",
status.Key())
Expand Down Expand Up @@ -1126,6 +1168,11 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) {
status.VirtualTPM = true
}

// pass nodeName to hypervisor call Setup
if status.NodeName == "" {
status.NodeName = ctx.nodeName
}

if err := hyper.Task(status).Setup(*status, *config, ctx.assignableAdapters, nil, file); err != nil {
//it is retry, so omit error
log.Errorf("Failed to create DomainStatus from %+v: %s",
Expand Down Expand Up @@ -1328,6 +1375,7 @@ func handleCreate(ctx *domainContext, key string, config *types.DomainConfig) {
State: types.INSTALLED,
VmConfig: config.VmConfig,
Service: config.Service,
IsDNidNode: config.IsDNidNode,
}

status.VmConfig.CPUs = ""
Expand Down Expand Up @@ -1537,6 +1585,13 @@ func doActivate(ctx *domainContext, config types.DomainConfig,
log.Functionf("doActivate(%v) for %s",
config.UUIDandVersion, config.DisplayName)

err := getnodeNameAndUUID(ctx)
if err != nil {
log.Errorf("doActivate(%s) getnodeNameAndUUID failed: %s",
status.Key(), err)
return
}

if ctx.cpuPinningSupported {
if err := assignCPUs(ctx, &config, status); err != nil {
log.Warnf("failed to assign CPUs for %s", config.DisplayName)
Expand Down Expand Up @@ -1678,6 +1733,11 @@ func doActivate(ctx *domainContext, config types.DomainConfig,
status.VirtualTPM = true
}

// pass nodeName to hypervisor call Setup
if status.NodeName == "" {
status.NodeName = ctx.nodeName
}

globalConfig := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig)
if err := hyper.Task(status).Setup(*status, config, ctx.assignableAdapters, globalConfig, file); err != nil {
log.Errorf("Failed to create DomainStatus from %+v: %s",
Expand Down Expand Up @@ -1750,6 +1810,11 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus,
log.Errorf("domain start for %s: %s", status.DomainName, err)
status.SetErrorNow(err.Error())

// HvKube case
if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName)
return
}
// Delete
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
Expand Down Expand Up @@ -1779,6 +1844,11 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus,
status.SetErrorNow(err.Error())
log.Errorf("doActivateTail(%v) failed for %s: %s",
status.UUIDandVersion, status.DisplayName, err)

if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName)
return
}
// Delete
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
Expand Down Expand Up @@ -1843,7 +1913,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
if doShutdown {
// If the Shutdown fails we don't wait; assume failure
// was due to no PV tools
if err := DomainShutdown(*status, false); err != nil {
if err := DomainShutdown(ctx, *status, false); err != nil {
log.Errorf("DomainShutdown %s failed: %s",
status.DomainName, err)
} else {
Expand All @@ -1863,7 +1933,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
// the domain is already on the way down.
// In case of errors we proceed directly to deleting the task,
// and after that we waitForDomainGone
if err := DomainShutdown(*status, true); err != nil {
if err := DomainShutdown(ctx, *status, true); err != nil {
log.Warnf("DomainShutdown -F %s failed: %s",
status.DomainName, err)
} else {
Expand All @@ -1880,6 +1950,11 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
}

if status.DomainId != 0 {
if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("doInactivate(%v) we are not DNiD, skip delete app", status.DomainName)
return
}

if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("Failed to delete domain %s (%v)", status.DomainName, err)
} else {
Expand Down Expand Up @@ -2467,6 +2542,11 @@ func handleDelete(ctx *domainContext, key string, status *types.DomainStatus) {
// No point in publishing metrics any more
ctx.pubDomainMetric.Unpublish(status.Key())

if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("handleDelete(%v) we are not DNiD, skip delete app", status.DomainName)
return
}

err := hyper.Task(status).Delete(status.DomainName)
if err != nil {
log.Errorln(err)
Expand Down Expand Up @@ -2506,13 +2586,18 @@ func DomainCreate(ctx *domainContext, status types.DomainStatus) (int, error) {
}

// DomainShutdown is a wrapper for domain shutdown
func DomainShutdown(status types.DomainStatus, force bool) error {
func DomainShutdown(ctx *domainContext, status types.DomainStatus, force bool) error {

var err error
log.Functionf("DomainShutdown force-%v %s %d", force, status.DomainName, status.DomainId)

// Stop the domain
log.Functionf("Stopping domain - %s", status.DomainName)

if ctx.hvTypeKube && !status.IsDNidNode {
log.Noticef("DomainShutdown(%v) we are not DNiD, skip delete app", status.DomainName)
return nil
}
err = hyper.Task(&status).Stop(status.DomainName, force)

return err
Expand Down Expand Up @@ -3601,3 +3686,16 @@ func lookupCapabilities(ctx *domainContext) (*types.Capabilities, error) {
}
return &capabilities, nil
}

func getnodeNameAndUUID(ctx *domainContext) error {
if ctx.nodeName == "" {
NodeInfo, err := ctx.subEdgeNodeInfo.Get("global")
if err != nil {
log.Errorf("getnodeNameAndUUID: can't get edgeNodeInfo %v", err)
return err
}
enInfo := NodeInfo.(types.EdgeNodeInfo)
ctx.nodeName = strings.ToLower(enInfo.DeviceName)
}
return nil
}
58 changes: 58 additions & 0 deletions pkg/pillar/cmd/zedagent/parseconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func parseConfig(getconfigCtx *getconfigContext, config *zconfig.EdgeDevConfig,

if source != fromBootstrap {
activateNewBaseOS := parseBaseOS(getconfigCtx, config)
parseEdgeNodeClusterConfig(getconfigCtx, config)

// Parse EdgeNode Cluster configuration

parseNetworkInstanceConfig(getconfigCtx, config)
parseContentInfoConfig(getconfigCtx, config)
parseVolumeConfig(getconfigCtx, config)
Expand Down Expand Up @@ -759,6 +763,10 @@ func parseAppInstanceConfig(getconfigCtx *getconfigContext,
// Add config submitted via local profile server.
addLocalAppConfig(getconfigCtx, &appInstance)

// XXX add Designated ID to the appInstance
// XXX Keep this here for now to allow the kubevirt single-node working, the later PR to EVE main will remove this
appInstance.DesignatedNodeID = devUUID

// Verify that it fits and if not publish with error
checkAndPublishAppInstanceConfig(getconfigCtx, appInstance)
}
Expand Down Expand Up @@ -3102,3 +3110,53 @@ func handleDeviceOperation(ctxPtr *zedagentContext, op types.DeviceOperation) {
shutdownAppsGlobal(ctxPtr)
// nothing else to be done
}

func parseEdgeNodeClusterConfig(getconfigCtx *getconfigContext,
config *zconfig.EdgeDevConfig) {

ctx := getconfigCtx.zedagentCtx
zcfgCluster := config.GetCluster()
if zcfgCluster == nil {
log.Functionf("parseEdgeNodeClusterConfig: No EdgeNodeClusterConfig, Unpublishing")
pub := ctx.pubEdgeNodeClusterConfig
items := pub.GetAll()
if len(items) > 0 {
log.Functionf("parseEdgeNodeClusterConfig: Unpublishing EdgeNodeClusterConfig")
ctx.pubEdgeNodeClusterConfig.Unpublish("global")
}
return
}
ipAddr, ipNet, err := net.ParseCIDR(zcfgCluster.GetClusterIpPrefix())
if err != nil {
log.Errorf("handleEdgeNodeConfigItem: ParseCIDR failed %s", err)
return
}
ipNet.IP = ipAddr

joinServerIP := net.ParseIP(zcfgCluster.GetJoinServerIp())
var isJoinNode bool
// deduce the bootstrap node status from clusterIPPrefix and joinServerIP
if ipAddr.Equal(joinServerIP) { // deduce the bootstrap node status from
isJoinNode = true
}

id, err := uuid.FromString(zcfgCluster.GetClusterId())
if err != nil {
log.Errorf("handleEdgeNodeConfigItem: failed to parse UUID: %v", err)
return
}
enClusterConfig := types.EdgeNodeClusterConfig{
ClusterName: zcfgCluster.GetClusterName(),
ClusterID: types.UUIDandVersion{UUID: id},
ClusterInterface: zcfgCluster.GetClusterInterface(),
ClusterIPPrefix: ipNet,
IsWorkerNode: zcfgCluster.GetIsWorkerNode(),
JoinServerIP: joinServerIP,
BootstrapNode: isJoinNode,
// XXX EncryptedClusterToken is only for gcp config
}
enClusterConfig.CipherToken = parseCipherBlock(getconfigCtx,
enClusterConfig.Key(), zcfgCluster.GetEncryptedClusterToken())
log.Functionf("parseEdgeNodeClusterConfig: ENCluster API, Config %+v, %v", zcfgCluster, enClusterConfig)
ctx.pubEdgeNodeClusterConfig.Publish("global", enClusterConfig)
}
Loading

0 comments on commit 631cc70

Please sign in to comment.