From 2453125222b2fe19ee35d3edaa18f4dadd1962ff Mon Sep 17 00:00:00 2001 From: Naiming Shen Date: Wed, 30 Oct 2024 11:38:26 -0700 Subject: [PATCH] Implement Clustering and App related functions - change VMI to VMI ReplicaSet for kubernetes - change Pod to Pod RelicaSet for containers - change functions handling replicaset names in services - subscribe EdgeNodeInfo in domainmgr, zedmanager to get node-name for cluster - add Designated Node ID to several structs for App - not to delete domain from kubernetes if not a Designated App node - parse config for EdgeNodeClusterConfig in zedagent - handle ENClusterAppStatus publication in zedmanger in multi-node clustering case - zedmanager handling effective-activation include ENClusterAppStatus - kubevirt hypervisor changes to handle VMI/Pod ReplicaSets Signed-off-by: Naiming Shen --- pkg/pillar/base/kubevirt.go | 28 +- pkg/pillar/cipher/cipher.go | 1 + pkg/pillar/cmd/domainmgr/domainmgr.go | 133 ++- pkg/pillar/cmd/zedagent/parseconfig.go | 58 ++ pkg/pillar/cmd/zedagent/zedagent.go | 13 + pkg/pillar/cmd/zedmanager/handleclusterapp.go | 43 + pkg/pillar/cmd/zedmanager/handledomainmgr.go | 9 +- pkg/pillar/cmd/zedmanager/handlezedrouter.go | 2 +- pkg/pillar/cmd/zedmanager/updatestatus.go | 30 +- pkg/pillar/cmd/zedmanager/zedmanager.go | 139 ++- pkg/pillar/cmd/zedrouter/cni.go | 4 + pkg/pillar/hypervisor/kubevirt.go | 803 ++++++++++++------ pkg/pillar/kubeapi/kubeapi.go | 4 +- pkg/pillar/types/clustertypes.go | 2 + pkg/pillar/types/domainmgrtypes.go | 5 + pkg/pillar/types/zedmanagertypes.go | 3 +- 16 files changed, 1015 insertions(+), 262 deletions(-) create mode 100644 pkg/pillar/cmd/zedmanager/handleclusterapp.go diff --git a/pkg/pillar/base/kubevirt.go b/pkg/pillar/base/kubevirt.go index 3c75eb95ae..5d250fc10b 100644 --- a/pkg/pillar/base/kubevirt.go +++ b/pkg/pillar/base/kubevirt.go @@ -67,15 +67,37 @@ func GetAppKubeName(displayName string, uuid uuid.UUID) string { } // GetVMINameFromVirtLauncher : get VMI name from the corresponding Kubevirt -// launcher pod name. +// launcher pod name for replicaset generated VMI. func GetVMINameFromVirtLauncher(podName string) (vmiName string, isVirtLauncher bool) { if !strings.HasPrefix(podName, VMIPodNamePrefix) { return "", false } vmiName = strings.TrimPrefix(podName, VMIPodNamePrefix) lastSep := strings.LastIndex(vmiName, "-") - if lastSep != -1 { - vmiName = vmiName[:lastSep] + if lastSep == -1 || lastSep < 5 { + return "", false } + + // Check if the last part is 5 bytes long + if len(vmiName[lastSep+1:]) != 5 { + return "", false + } + + // Use the index minus 5 bytes to get the VMI name to remove added + // replicaset suffix + vmiName = vmiName[:lastSep-5] return vmiName, true } + +// GetReplicaPodName : get the app name from the pod name for replica pods. +func GetReplicaPodName(displayName, podName string, uuid uuid.UUID) (kubeName string, isReplicaPod bool) { + kubeName = GetAppKubeName(displayName, uuid) + if !strings.HasPrefix(podName, kubeName) { + return "", false + } + suffix := strings.TrimPrefix(podName, kubeName) + if strings.HasPrefix(suffix, "-") && len(suffix[1:]) == 5 { + return kubeName, true + } + return "", false +} diff --git a/pkg/pillar/cipher/cipher.go b/pkg/pillar/cipher/cipher.go index 42b486e5e1..db7ff4a3e8 100644 --- a/pkg/pillar/cipher/cipher.go +++ b/pkg/pillar/cipher/cipher.go @@ -27,6 +27,7 @@ func getEncryptionBlock( decBlock.CellularNetUsername = zconfigDecBlockPtr.CellularNetUsername decBlock.CellularNetPassword = zconfigDecBlockPtr.CellularNetPassword decBlock.ProtectedUserData = zconfigDecBlockPtr.ProtectedUserData + decBlock.ClusterToken = zconfigDecBlockPtr.ClusterToken return decBlock } diff --git a/pkg/pillar/cmd/domainmgr/domainmgr.go b/pkg/pillar/cmd/domainmgr/domainmgr.go index 906db44751..f5a8c1e7cf 100644 --- a/pkg/pillar/cmd/domainmgr/domainmgr.go +++ b/pkg/pillar/cmd/domainmgr/domainmgr.go @@ -91,6 +91,7 @@ type domainContext struct { pubDomainStatus pubsub.Publication subGlobalConfig pubsub.Subscription subZFSPoolStatus pubsub.Subscription + subEdgeNodeInfo pubsub.Subscription pubAssignableAdapters pubsub.Publication pubDomainMetric pubsub.Publication pubHostMemory pubsub.Publication @@ -126,6 +127,7 @@ type domainContext struct { cpuPinningSupported bool // Is it kubevirt eve hvTypeKube bool + nodeName string } // AddAgentSpecificCLIFlags adds CLI options @@ -414,9 +416,24 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar domainCtx.subZFSPoolStatus = subZFSPoolStatus subZFSPoolStatus.Activate() + // Look for edge node info + subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeInfo{}, + Persistent: true, + Activate: false, + }) + if err != nil { + log.Fatal(err) + } + domainCtx.subEdgeNodeInfo = subEdgeNodeInfo + _ = subEdgeNodeInfo.Activate() + // Parse any existing ConfigIntemValueMap but continue if there // is none - for !domainCtx.GCComplete { + waitEdgeNodeInfo := true + for !domainCtx.GCComplete || (domainCtx.hvTypeKube && waitEdgeNodeInfo) { log.Noticef("waiting for GCComplete") select { case change := <-subGlobalConfig.MsgChan(): @@ -425,6 +442,10 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case <-domainCtx.publishTicker.C: publishProcessesHandler(&domainCtx) + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + waitEdgeNodeInfo = false + case <-stillRunning.C: } ps.StillRunning(agentName, warningTime, errorTime) @@ -513,6 +534,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case change := <-subZFSPoolStatus.MsgChan(): subZFSPoolStatus.ProcessChange(change) + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + case <-domainCtx.publishTicker.C: publishProcessesHandler(&domainCtx) @@ -651,6 +675,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case change := <-subPhysicalIOAdapter.MsgChan(): subPhysicalIOAdapter.ProcessChange(change) + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + case <-domainCtx.publishTicker.C: start := time.Now() err = domainCtx.cipherMetrics.Publish(log, cipherMetricsPub, "global") @@ -977,12 +1004,15 @@ func verifyStatus(ctx *domainContext, status *types.DomainStatus) { status.SetErrorDescription(errDescription) } - //cleanup app instance tasks - if err := hyper.Task(status).Delete(status.DomainName); err != nil { - log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) - } - if err := hyper.Task(status).Cleanup(status.DomainName); err != nil { - log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err) + // in cluster mode, we can not delete the pod due to failing to get app info + if !ctx.hvTypeKube { + //cleanup app instance tasks + if err := hyper.Task(status).Delete(status.DomainName); err != nil { + log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) + } + if err := hyper.Task(status).Cleanup(status.DomainName); err != nil { + log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err) + } } } status.DomainId = 0 @@ -1071,6 +1101,14 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) { if !status.BootFailed { return } + + err := getnodeNameAndUUID(ctx) + if err != nil { + log.Errorf("maybeRetryBoot(%s) getnodeNameAndUUID failed: %s", + status.Key(), err) + return + } + if status.Activated && status.BootFailed { log.Functionf("maybeRetryBoot(%s) clearing bootFailed since Activated", status.Key()) @@ -1138,6 +1176,11 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) { log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err) } + // pass nodeName to hypervisor call Setup + if status.NodeName == "" { + status.NodeName = ctx.nodeName + } + if err := hyper.Task(status).Setup(*status, *config, ctx.assignableAdapters, nil, file); err != nil { //it is retry, so omit error log.Errorf("Failed to create DomainStatus from %+v: %s", @@ -1322,6 +1365,7 @@ func handleCreate(ctx *domainContext, key string, config *types.DomainConfig) { State: types.INSTALLED, VmConfig: config.VmConfig, Service: config.Service, + IsDNidNode: config.IsDNidNode, } status.VmConfig.CPUs = make([]int, 0) @@ -1531,6 +1575,13 @@ func doActivate(ctx *domainContext, config types.DomainConfig, log.Functionf("doActivate(%v) for %s", config.UUIDandVersion, config.DisplayName) + err := getnodeNameAndUUID(ctx) + if err != nil { + log.Errorf("doActivate(%s) getnodeNameAndUUID failed: %s", + status.Key(), err) + return + } + if ctx.cpuPinningSupported { if err := assignCPUs(ctx, &config, status); err != nil { log.Warnf("failed to assign CPUs for %s", config.DisplayName) @@ -1684,6 +1735,11 @@ func doActivate(ctx *domainContext, config types.DomainConfig, log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err) } + // pass nodeName to hypervisor call Setup + if status.NodeName == "" { + status.NodeName = ctx.nodeName + } + globalConfig := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig) if err := hyper.Task(status).Setup(*status, config, ctx.assignableAdapters, globalConfig, file); err != nil { log.Errorf("Failed to create DomainStatus from %+v: %s", @@ -1751,6 +1807,17 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus, log.Errorf("domain start for %s: %s", status.DomainName, err) status.SetErrorNow(err.Error()) + // HvKube case + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + + // Only send delete if DomainConfig is not deleted + if ctx.hvTypeKube && !status.DomainConfigDeleted { + log.Noticef("doActivateTail(%v) DomainConfig exists, skip delete app", status.DomainName) + return + } // Delete if err := hyper.Task(status).Delete(status.DomainName); err != nil { log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) @@ -1780,6 +1847,17 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus, status.SetErrorNow(err.Error()) log.Errorf("doActivateTail(%v) failed for %s: %s", status.UUIDandVersion, status.DisplayName, err) + + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + // Only send delete if DomainConfig is not deleted + if ctx.hvTypeKube && !status.DomainConfigDeleted { + log.Noticef("doActivateTail(%v) DomainConfig exists, skip delete app", status.DomainName) + return + } + // Delete if err := hyper.Task(status).Delete(status.DomainName); err != nil { log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) @@ -1844,7 +1922,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool if doShutdown { // If the Shutdown fails we don't wait; assume failure // was due to no PV tools - if err := DomainShutdown(*status, false); err != nil { + if err := DomainShutdown(ctx, *status, false); err != nil { log.Errorf("DomainShutdown %s failed: %s", status.DomainName, err) } else { @@ -1864,7 +1942,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool // the domain is already on the way down. // In case of errors we proceed directly to deleting the task, // and after that we waitForDomainGone - if err := DomainShutdown(*status, true); err != nil { + if err := DomainShutdown(ctx, *status, true); err != nil { log.Warnf("DomainShutdown -F %s failed: %s", status.DomainName, err) } else { @@ -1881,6 +1959,16 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool } if status.DomainId != 0 { + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("doInactivate(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + // Only send delete if DomainConfig is not deleted + if ctx.hvTypeKube && !status.DomainConfigDeleted { + log.Noticef("doInactivate(%v) DomainConfig exists, skip delete app", status.DomainName) + return + } + if err := hyper.Task(status).Delete(status.DomainName); err != nil { log.Errorf("Failed to delete domain %s (%v)", status.DomainName, err) } else { @@ -2468,6 +2556,13 @@ func handleDelete(ctx *domainContext, key string, status *types.DomainStatus) { // No point in publishing metrics any more ctx.pubDomainMetric.Unpublish(status.Key()) + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("handleDelete(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + status.DomainConfigDeleted = true + log.Noticef("handleDelete(%v) DomainConfigDeleted", status.DomainName) + err := hyper.Task(status).Delete(status.DomainName) if err != nil { log.Errorln(err) @@ -2508,13 +2603,18 @@ func DomainCreate(ctx *domainContext, status types.DomainStatus) (int, error) { } // DomainShutdown is a wrapper for domain shutdown -func DomainShutdown(status types.DomainStatus, force bool) error { +func DomainShutdown(ctx *domainContext, status types.DomainStatus, force bool) error { var err error log.Functionf("DomainShutdown force-%v %s %d", force, status.DomainName, status.DomainId) // Stop the domain log.Functionf("Stopping domain - %s", status.DomainName) + + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("DomainShutdown(%v) we are not DNiD, skip delete app", status.DomainName) + return nil + } err = hyper.Task(&status).Stop(status.DomainName, force) return err @@ -3603,3 +3703,16 @@ func lookupCapabilities(ctx *domainContext) (*types.Capabilities, error) { } return &capabilities, nil } + +func getnodeNameAndUUID(ctx *domainContext) error { + if ctx.nodeName == "" { + NodeInfo, err := ctx.subEdgeNodeInfo.Get("global") + if err != nil { + log.Errorf("getnodeNameAndUUID: can't get edgeNodeInfo %v", err) + return err + } + enInfo := NodeInfo.(types.EdgeNodeInfo) + ctx.nodeName = strings.ToLower(enInfo.DeviceName) + } + return nil +} diff --git a/pkg/pillar/cmd/zedagent/parseconfig.go b/pkg/pillar/cmd/zedagent/parseconfig.go index be8ccad25e..2c7d5e21a3 100644 --- a/pkg/pillar/cmd/zedagent/parseconfig.go +++ b/pkg/pillar/cmd/zedagent/parseconfig.go @@ -151,6 +151,10 @@ func parseConfig(getconfigCtx *getconfigContext, config *zconfig.EdgeDevConfig, if source != fromBootstrap { activateNewBaseOS := parseBaseOS(getconfigCtx, config) + parseEdgeNodeClusterConfig(getconfigCtx, config) + + // Parse EdgeNode Cluster configuration + parseNetworkInstanceConfig(getconfigCtx, config) parseContentInfoConfig(getconfigCtx, config) parseVolumeConfig(getconfigCtx, config) @@ -764,6 +768,10 @@ func parseAppInstanceConfig(getconfigCtx *getconfigContext, // Add config submitted via local profile server. addLocalAppConfig(getconfigCtx, &appInstance) + // XXX add Designated ID to the appInstance + // XXX Keep this here for now to allow the kubevirt single-node working, the later PR to EVE main will remove this + appInstance.DesignatedNodeID = devUUID + // Verify that it fits and if not publish with error checkAndPublishAppInstanceConfig(getconfigCtx, appInstance) } @@ -3199,3 +3207,53 @@ func handleDeviceOperation(ctxPtr *zedagentContext, op types.DeviceOperation) { shutdownAppsGlobal(ctxPtr) // nothing else to be done } + +func parseEdgeNodeClusterConfig(getconfigCtx *getconfigContext, + config *zconfig.EdgeDevConfig) { + + ctx := getconfigCtx.zedagentCtx + zcfgCluster := config.GetCluster() + if zcfgCluster == nil { + log.Functionf("parseEdgeNodeClusterConfig: No EdgeNodeClusterConfig, Unpublishing") + pub := ctx.pubEdgeNodeClusterConfig + items := pub.GetAll() + if len(items) > 0 { + log.Functionf("parseEdgeNodeClusterConfig: Unpublishing EdgeNodeClusterConfig") + ctx.pubEdgeNodeClusterConfig.Unpublish("global") + } + return + } + ipAddr, ipNet, err := net.ParseCIDR(zcfgCluster.GetClusterIpPrefix()) + if err != nil { + log.Errorf("parseEdgeNodeClusterConfig: ParseCIDR failed %s", err) + return + } + ipNet.IP = ipAddr + + joinServerIP := net.ParseIP(zcfgCluster.GetJoinServerIp()) + var isJoinNode bool + // deduce the bootstrap node status from clusterIPPrefix and joinServerIP + if ipAddr.Equal(joinServerIP) { // deduce the bootstrap node status from + isJoinNode = true + } + + id, err := uuid.FromString(zcfgCluster.GetClusterId()) + if err != nil { + log.Errorf("parseEdgeNodeClusterConfig: failed to parse UUID: %v", err) + return + } + enClusterConfig := types.EdgeNodeClusterConfig{ + ClusterName: zcfgCluster.GetClusterName(), + ClusterID: types.UUIDandVersion{UUID: id}, + ClusterInterface: zcfgCluster.GetClusterInterface(), + ClusterIPPrefix: ipNet, + IsWorkerNode: zcfgCluster.GetIsWorkerNode(), + JoinServerIP: joinServerIP, + BootstrapNode: isJoinNode, + // XXX EncryptedClusterToken is only for gcp config + } + enClusterConfig.CipherToken = parseCipherBlock(getconfigCtx, + enClusterConfig.Key(), zcfgCluster.GetEncryptedClusterToken()) + log.Functionf("parseEdgeNodeClusterConfig: ENCluster API, Config %+v, %v", zcfgCluster, enClusterConfig) + ctx.pubEdgeNodeClusterConfig.Publish("global", enClusterConfig) +} diff --git a/pkg/pillar/cmd/zedagent/zedagent.go b/pkg/pillar/cmd/zedagent/zedagent.go index 70e88b7a70..62d4fc2df1 100644 --- a/pkg/pillar/cmd/zedagent/zedagent.go +++ b/pkg/pillar/cmd/zedagent/zedagent.go @@ -229,6 +229,9 @@ type zedagentContext struct { // Is Kubevirt eve hvTypeKube bool + // EN cluster config + pubEdgeNodeClusterConfig pubsub.Publication + // Netdump netDumper *netdump.NetDumper // nil if netdump is disabled netdumpInterval time.Duration @@ -1103,6 +1106,16 @@ func initPublications(zedagentCtx *zedagentContext) { } getconfigCtx.pubZedAgentStatus.ClearRestarted() + zedagentCtx.pubEdgeNodeClusterConfig, err = ps.NewPublication(pubsub.PublicationOptions{ + AgentName: agentName, + Persistent: true, + TopicType: types.EdgeNodeClusterConfig{}, + }) + if err != nil { + log.Fatal(err) + } + zedagentCtx.pubEdgeNodeClusterConfig.ClearRestarted() + getconfigCtx.pubPhysicalIOAdapters, err = ps.NewPublication(pubsub.PublicationOptions{ AgentName: agentName, TopicType: types.PhysicalIOAdapterList{}, diff --git a/pkg/pillar/cmd/zedmanager/handleclusterapp.go b/pkg/pillar/cmd/zedmanager/handleclusterapp.go new file mode 100644 index 0000000000..83a94dfa67 --- /dev/null +++ b/pkg/pillar/cmd/zedmanager/handleclusterapp.go @@ -0,0 +1,43 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package zedmanager + +import "github.com/lf-edge/eve/pkg/pillar/types" + +func handleENClusterAppStatusCreate(ctxArg interface{}, key string, configArg interface{}) { + log.Functionf("handleENClusterAppStatusCreate(%s)", key) + ctx := ctxArg.(*zedmanagerContext) + status := configArg.(types.ENClusterAppStatus) + handleENClusterAppStatusImpl(ctx, key, &status) +} + +func handleENClusterAppStatusModify(ctxArg interface{}, key string, configArg interface{}, oldConfigArg interface{}) { + log.Functionf("handleENClusterAppStatusModify(%s)", key) + ctx := ctxArg.(*zedmanagerContext) + status := configArg.(types.ENClusterAppStatus) + handleENClusterAppStatusImpl(ctx, key, &status) +} + +func handleENClusterAppStatusDelete(ctxArg interface{}, key string, configArg interface{}) { + log.Functionf("handleENClusterAppStatusDelete(%s)", key) + ctx := ctxArg.(*zedmanagerContext) + //status := configArg.(types.ENClusterAppStatus) + handleENClusterAppStatusImpl(ctx, key, nil) +} + +func handleENClusterAppStatusImpl(ctx *zedmanagerContext, key string, status *types.ENClusterAppStatus) { + + log.Functionf("handleENClusterAppStatusImpl(%s) for app-status %v", key, status) + pub := ctx.pubAppInstanceStatus + items := pub.GetAll() + for _, st := range items { + aiStatus := st.(types.AppInstanceStatus) + if aiStatus.UUIDandVersion.UUID.String() == key { + log.Functionf("handleENClusterAppStatusImpl(%s) found ai status, update", key) + + updateAIStatusUUID(ctx, aiStatus.UUIDandVersion.UUID.String()) + break + } + } +} diff --git a/pkg/pillar/cmd/zedmanager/handledomainmgr.go b/pkg/pillar/cmd/zedmanager/handledomainmgr.go index 35c3e53e69..33ccb70b37 100644 --- a/pkg/pillar/cmd/zedmanager/handledomainmgr.go +++ b/pkg/pillar/cmd/zedmanager/handledomainmgr.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/lf-edge/eve/pkg/pillar/types" + uuid "github.com/satori/go.uuid" ) const ( @@ -46,7 +47,12 @@ func MaybeAddDomainConfig(ctx *zedmanagerContext, if ns != nil { AppNum = ns.AppNum } - effectiveActivate := effectiveActivateCurrentProfile(aiConfig, ctx.currentProfile) + + isDNiDnode := false + if aiConfig.DesignatedNodeID != uuid.Nil && aiConfig.DesignatedNodeID == ctx.nodeUUID { + isDNiDnode = true + } + effectiveActivate := effectiveActivateCombined(aiConfig, ctx) dc := types.DomainConfig{ UUIDandVersion: aiConfig.UUIDandVersion, DisplayName: aiConfig.DisplayName, @@ -60,6 +66,7 @@ func MaybeAddDomainConfig(ctx *zedmanagerContext, MetaDataType: aiConfig.MetaDataType, Service: aiConfig.Service, CloudInitVersion: aiConfig.CloudInitVersion, + IsDNidNode: isDNiDnode, } dc.DiskConfigList = make([]types.DiskConfig, 0, len(aiStatus.VolumeRefStatusList)) diff --git a/pkg/pillar/cmd/zedmanager/handlezedrouter.go b/pkg/pillar/cmd/zedmanager/handlezedrouter.go index 176e54688c..9a04791779 100644 --- a/pkg/pillar/cmd/zedmanager/handlezedrouter.go +++ b/pkg/pillar/cmd/zedmanager/handlezedrouter.go @@ -19,7 +19,7 @@ func MaybeAddAppNetworkConfig(ctx *zedmanagerContext, log.Functionf("MaybeAddAppNetworkConfig for %s displayName %s", key, displayName) - effectiveActivate := effectiveActivateCurrentProfile(aiConfig, ctx.currentProfile) + effectiveActivate := effectiveActivateCombined(aiConfig, ctx) changed := false m := lookupAppNetworkConfig(ctx, key) diff --git a/pkg/pillar/cmd/zedmanager/updatestatus.go b/pkg/pillar/cmd/zedmanager/updatestatus.go index 3c2a265b87..0a3da0c25c 100644 --- a/pkg/pillar/cmd/zedmanager/updatestatus.go +++ b/pkg/pillar/cmd/zedmanager/updatestatus.go @@ -202,7 +202,7 @@ func doUpdate(ctx *zedmanagerContext, return changed } - effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) + effectiveActivate := effectiveActivateCombined(config, ctx) if !effectiveActivate { if status.Activated || status.ActivateInprogress { @@ -729,6 +729,34 @@ func doActivate(ctx *zedmanagerContext, uuidStr string, // if the VM already active or in restarting/purging state - continue with the doActivate logic } + // delay this if referencename is not set + if ctx.hvTypeKube && config.FixedResources.VirtualizationMode == types.NOHYPER { + var findcontainer bool + for _, vrc := range config.VolumeRefConfigList { + vrs := lookupVolumeRefStatus(ctx, vrc.Key()) + if vrs == nil || !vrs.IsContainer() { + continue + } + findcontainer = true + if vrs.ReferenceName == "" { + log.Noticef("doActivate: waiting for referencename ") + if status.State != types.START_DELAYED { + status.State = types.START_DELAYED + return true + } + return changed + } + } + if !findcontainer { + log.Noticef("doActivate: no container found, wait") + if status.State != types.START_DELAYED { + status.State = types.START_DELAYED + return true + } + return changed + } + } + // Make sure we have a DomainConfig // We modify it below and then publish it dc, err := MaybeAddDomainConfig(ctx, config, *status, ns) diff --git a/pkg/pillar/cmd/zedmanager/zedmanager.go b/pkg/pillar/cmd/zedmanager/zedmanager.go index 1fc2e6b661..abe4872800 100644 --- a/pkg/pillar/cmd/zedmanager/zedmanager.go +++ b/pkg/pillar/cmd/zedmanager/zedmanager.go @@ -26,6 +26,7 @@ import ( "github.com/lf-edge/eve/pkg/pillar/types" fileutils "github.com/lf-edge/eve/pkg/pillar/utils/file" "github.com/lf-edge/eve/pkg/pillar/utils/wait" + uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" ) @@ -51,9 +52,11 @@ type zedmanagerContext struct { subAppNetworkStatus pubsub.Subscription pubDomainConfig pubsub.Publication subDomainStatus pubsub.Subscription + subENClusterAppStatus pubsub.Subscription subGlobalConfig pubsub.Subscription subHostMemory pubsub.Subscription subZedAgentStatus pubsub.Subscription + subEdgeNodeInfo pubsub.Subscription pubVolumesSnapConfig pubsub.Publication subVolumesSnapStatus pubsub.Subscription subAssignableAdapters pubsub.Subscription @@ -71,6 +74,7 @@ type zedmanagerContext struct { assignableAdapters *types.AssignableAdapters // Is it kubevirt eve hvTypeKube bool + nodeUUID uuid.UUID } // AddAgentSpecificCLIFlags adds CLI options @@ -375,6 +379,24 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar ctx.subVolumesSnapStatus = subVolumesSnapshotStatus _ = subVolumesSnapshotStatus.Activate() + subENClusterAppStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedkube", + MyAgentName: agentName, + TopicImpl: types.ENClusterAppStatus{}, + Activate: false, + Ctx: &ctx, + CreateHandler: handleENClusterAppStatusCreate, + ModifyHandler: handleENClusterAppStatusModify, + DeleteHandler: handleENClusterAppStatusDelete, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + log.Fatal(err) + } + ctx.subENClusterAppStatus = subENClusterAppStatus + _ = subENClusterAppStatus.Activate() + ctx.subAssignableAdapters, err = ps.NewSubscription(pubsub.SubscriptionOptions{ AgentName: "domainmgr", MyAgentName: agentName, @@ -391,6 +413,20 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar log.Fatal(err) } + // Look for edge node info + subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeInfo{}, + Persistent: true, + Activate: false, + }) + if err != nil { + log.Fatal(err) + } + ctx.subEdgeNodeInfo = subEdgeNodeInfo + _ = subEdgeNodeInfo.Activate() + // Pick up debug aka log level before we start real work for !ctx.GCInitialized { log.Functionf("waiting for GCInitialized") @@ -441,6 +477,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case change := <-subVolumesSnapshotStatus.MsgChan(): subVolumesSnapshotStatus.ProcessChange(change) + case change := <-subENClusterAppStatus.MsgChan(): + subENClusterAppStatus.ProcessChange(change) + case change := <-ctx.subAssignableAdapters.MsgChan(): ctx.subAssignableAdapters.ProcessChange(change) @@ -455,6 +494,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar ctx.checkFreedResources = false } + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + case <-delayedStartTicker.C: checkDelayedStartApps(&ctx) @@ -659,7 +701,7 @@ func publishAppInstanceSummary(ctxPtr *zedmanagerContext) { effectiveActivate := false config := lookupAppInstanceConfig(ctxPtr, status.Key(), true) if config != nil { - effectiveActivate = effectiveActivateCurrentProfile(*config, ctxPtr.currentProfile) + effectiveActivate = effectiveActivateCombined(*config, ctxPtr) } // Only condition we did not count is EffectiveActive = true and Activated = false. // That means customer either halted his app or did not activate it yet. @@ -689,6 +731,17 @@ func publishAppInstanceStatus(ctx *zedmanagerContext, key := status.Key() log.Tracef("publishAppInstanceStatus(%s)", key) pub := ctx.pubAppInstanceStatus + if ctx.hvTypeKube { + sub := ctx.subENClusterAppStatus + st, _ := sub.Get(key) + if st != nil { + clusterStatus := st.(types.ENClusterAppStatus) + if !clusterStatus.ScheduledOnThisNode { + log.Functionf("publishAppInstanceStatus(%s) not scheduled on this node, skip", key) + return + } + } + } pub.Publish(key, *status) } @@ -1177,7 +1230,7 @@ func handleModify(ctxArg interface{}, key string, updateSnapshotsInAIStatus(status, config) - effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) + effectiveActivate := effectiveActivateCombined(config, ctx) publishAppInstanceStatus(ctx, status) @@ -1545,8 +1598,9 @@ func updateBasedOnProfile(ctx *zedmanagerContext, oldProfile string) { if localConfig := lookupLocalAppInstanceConfig(ctx, config.Key()); localConfig != nil { config = *localConfig } - effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) - effectiveActivateOld := effectiveActivateCurrentProfile(config, oldProfile) + effectiveActivate := effectiveActivateCombined(config, ctx) + effectiveActivateOldTemp := effectiveActivateCurrentProfile(config, oldProfile) + effectiveActivateOld := getKubeAppActivateStatus(ctx, config, effectiveActivateOldTemp) if effectiveActivateOld == effectiveActivate { // no changes in effective activate continue @@ -1563,6 +1617,14 @@ func updateBasedOnProfile(ctx *zedmanagerContext, oldProfile string) { } // returns effective Activate status based on Activate from app instance config and current profile +func effectiveActivateCombined(config types.AppInstanceConfig, ctx *zedmanagerContext) bool { + effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) + // Add cluster login in the activate state + combined := getKubeAppActivateStatus(ctx, config, effectiveActivate) + log.Functionf("effectiveActivateCombined: effectiveActivate %t, combined %t", effectiveActivate, combined) + return combined +} + func effectiveActivateCurrentProfile(config types.AppInstanceConfig, currentProfile string) bool { if currentProfile == "" { log.Functionf("effectiveActivateCurrentProfile(%s): empty current", config.Key()) @@ -1586,3 +1648,72 @@ func effectiveActivateCurrentProfile(config types.AppInstanceConfig, currentProf config.Key(), currentProfile) return false } + +func getKubeAppActivateStatus(ctx *zedmanagerContext, aiConfig types.AppInstanceConfig, effectiveActivate bool) bool { + + if !ctx.hvTypeKube || aiConfig.DesignatedNodeID == uuid.Nil { + return effectiveActivate + } + + if ctx.nodeUUID == uuid.Nil { + err := getnodeNameAndUUID(ctx) + if err != nil { + log.Errorf("getKubeAppActivateStatus: can't get nodeUUID %v", err) + return false + } + } + sub := ctx.subENClusterAppStatus + items := sub.GetAll() + + // 1) if the dnid is on this node + // a) if the pod is not on this node, and the pod is running, return false + // b) otherwise, return true + // 2) if the dnid is not on this node + // a) if the pod is on this node, and status is running, return true + // b) otherwise, return false + var onTheDevice bool + var statusRunning bool + for _, item := range items { + status := item.(types.ENClusterAppStatus) + if status.AppUUID == aiConfig.UUIDandVersion.UUID { + statusRunning = status.StatusRunning + if status.IsDNidNode { + onTheDevice = true + break + } else if status.ScheduledOnThisNode { + onTheDevice = true + break + } + } + } + + log.Functionf("getKubeAppActivateStatus: ai %s, node %s, onTheDevice %v, statusRunning %v", + aiConfig.DesignatedNodeID.String(), ctx.nodeUUID, onTheDevice, statusRunning) + if aiConfig.DesignatedNodeID == ctx.nodeUUID { + if statusRunning && !onTheDevice { + return false + } + return effectiveActivate + } else { + // the pod is on this node, but it will not be in running state, unless + // zedmanager make this app activate and zedrouter CNI has the network status + // for this App. So, not in running state is ok. + if onTheDevice { + return effectiveActivate + } + return false + } +} + +func getnodeNameAndUUID(ctx *zedmanagerContext) error { + if ctx.nodeUUID == uuid.Nil { + NodeInfo, err := ctx.subEdgeNodeInfo.Get("global") + if err != nil { + log.Errorf("getnodeNameAndUUID: can't get edgeNodeInfo %v", err) + return err + } + enInfo := NodeInfo.(types.EdgeNodeInfo) + ctx.nodeUUID = enInfo.DeviceID + } + return nil +} diff --git a/pkg/pillar/cmd/zedrouter/cni.go b/pkg/pillar/cmd/zedrouter/cni.go index ec069cbc4a..56bc3c2a32 100644 --- a/pkg/pillar/cmd/zedrouter/cni.go +++ b/pkg/pillar/cmd/zedrouter/cni.go @@ -333,6 +333,10 @@ func (z *zedrouter) getAppByPodName( for _, item := range z.pubAppNetworkStatus.GetAll() { appStatus := item.(types.AppNetworkStatus) appUUID := appStatus.UUIDandVersion.UUID + repPodName, isReplicaPod := base.GetReplicaPodName(appStatus.DisplayName, podName, appUUID) + if isReplicaPod { + appKubeName = repPodName + } if base.GetAppKubeName(appStatus.DisplayName, appUUID) == appKubeName { appConfig := z.lookupAppNetworkConfig(appStatus.Key()) if appConfig == nil { diff --git a/pkg/pillar/hypervisor/kubevirt.go b/pkg/pillar/hypervisor/kubevirt.go index 378584edbc..8af10bc9e1 100644 --- a/pkg/pillar/hypervisor/kubevirt.go +++ b/pkg/pillar/hypervisor/kubevirt.go @@ -27,6 +27,7 @@ import ( netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" "github.com/lf-edge/eve/pkg/pillar/kubeapi" "github.com/sirupsen/logrus" + appsv1 "k8s.io/api/apps/v1" k8sv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -34,6 +35,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" metricsv "k8s.io/metrics/pkg/client/clientset/versioned" + "k8s.io/utils/pointer" v1 "kubevirt.io/api/core/v1" "kubevirt.io/client-go/kubecli" ) @@ -45,17 +47,30 @@ const ( eveLabelKey = "App-Domain-Name" waitForPodCheckCounter = 5 // Check 5 times waitForPodCheckTime = 15 // Check every 15 seconds, don't wait for too long to cause watchdog + tolerateSec = 30 // Pod/VMI reschedule delay after node unreachable seconds +) + +// MetaDataType is a type for different Domain types +// We only support ReplicaSet for VMI and Pod for now. +type MetaDataType int + +// Constants representing different resource types. +const ( + IsMetaVmi MetaDataType = iota + IsMetaPod + IsMetaReplicaVMI + IsMetaReplicaPod ) // VM instance meta data structure. type vmiMetaData struct { - vmi *v1.VirtualMachineInstance // Handle to the VM instance - pod *k8sv1.Pod // Handle to the pod container - domainID int // DomainID understood by domainmgr in EVE - isPod bool // switch on is Pod or is VMI - name string // Display-Name(all lower case) + first 5 bytes of domainName - cputotal uint64 // total CPU in NS so far - maxmem uint32 // total Max memory usage in bytes so far + repPod *appsv1.ReplicaSet // Handle to the replicaSetof pod + repVMI *v1.VirtualMachineInstanceReplicaSet // Handle to the replicaSet of VMI + domainID int // DomainID understood by domainmgr in EVE + mtype MetaDataType // switch on is ReplicaSet, Pod or is VMI + name string // Display-Name(all lower case) + first 5 bytes of domainName + cputotal uint64 // total CPU in NS so far + maxmem uint32 // total Max memory usage in bytes so far } type kubevirtContext struct { @@ -66,12 +81,14 @@ type kubevirtContext struct { virthandlerIPAddr string prevDomainMetric map[string]types.DomainMetric kubeConfig *rest.Config + nodeNameMap map[string]string // to pass nodeName between methods without pointer receiver } // Use few states for now var stateMap = map[string]types.SwState{ "Paused": types.PAUSED, "Running": types.RUNNING, + "NonLocal": types.RUNNING, "shutdown": types.HALTING, "suspended": types.PAUSED, "Pending": types.PENDING, @@ -131,6 +148,7 @@ func newKubevirt() Hypervisor { devicemodel: "virt", vmiList: make(map[string]*vmiMetaData), prevDomainMetric: make(map[string]types.DomainMetric), + nodeNameMap: make(map[string]string), } case "amd64": return kubevirtContext{ @@ -138,6 +156,7 @@ func newKubevirt() Hypervisor { devicemodel: "pc-q35-3.1", vmiList: make(map[string]*vmiMetaData), prevDomainMetric: make(map[string]types.DomainMetric), + nodeNameMap: make(map[string]string), } } return nil @@ -197,13 +216,15 @@ func (ctx kubevirtContext) Setup(status types.DomainStatus, config types.DomainC "for app %s", config.DisplayName) } + getMyNodeUUID(&ctx, status.NodeName) + if config.VirtualizationMode == types.NOHYPER { - if err := ctx.CreatePodConfig(domainName, config, status, diskStatusList, aa, file); err != nil { - return logError("failed to build kube pod config: %v", err) + if err := ctx.CreateReplicaPodConfig(domainName, config, status, diskStatusList, aa, file); err != nil { + return logError("failed to build kube replicaset config: %v", err) } } else { - // Take eve domain config and convert to VMI config - if err := ctx.CreateVMIConfig(domainName, config, status, diskStatusList, aa, file); err != nil { + // Take eve domain config and convert to VMI Replicaset config + if err := ctx.CreateReplicaVMIConfig(domainName, config, status, diskStatusList, aa, file); err != nil { return logError("failed to build kube config: %v", err) } } @@ -212,12 +233,12 @@ func (ctx kubevirtContext) Setup(status types.DomainStatus, config types.DomainC } -// Kubevirt VMI config spec is updated with the domain config/status of the app. +// Kubevirt VMI ReplicaSet config spec is updated with the domain config/status of the app. // The details and the struct of the spec can be found at: // https://kubevirt.io/api-reference/v1.0.0/definitions.html -func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.DomainConfig, status types.DomainStatus, +func (ctx kubevirtContext) CreateReplicaVMIConfig(domainName string, config types.DomainConfig, status types.DomainStatus, diskStatusList []types.DiskStatus, aa *types.AssignableAdapters, file *os.File) error { - logrus.Debugf("CreateVMIConfig called for Domain: %s", domainName) + logrus.Debugf("CreateReplicaVMIConfig called for Domain: %s", domainName) err := getConfig(&ctx) if err != nil { @@ -225,12 +246,15 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai } kvClient, err := kubecli.GetKubevirtClientFromRESTConfig(ctx.kubeConfig) - if err != nil { logrus.Errorf("couldn't get the kubernetes client API config: %v", err) return err } + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Failed to get nodeName") + } kubeName := base.GetAppKubeName(config.DisplayName, config.UUIDandVersion.UUID) // Get a VirtualMachineInstance object and populate the values from DomainConfig vmi := v1.NewVMIReferenceFromNameWithNS(kubeapi.EVEKubeNameSpace, kubeName) @@ -242,7 +266,7 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai // Set memory mem := v1.Memory{} - m, err := resource.ParseQuantity(convertToKubernetesFormat(config.Memory * 1024)) // To bytes from KB + m, err := resource.ParseQuantity(convertToKubernetesFormat(config.Memory * 1024)) if err != nil { logrus.Errorf("Could not parse the memory value %v", err) return err @@ -290,7 +314,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai vols := make([]v1.Volume, len(diskStatusList)) ndisks := len(diskStatusList) for i, ds := range diskStatusList { - diskName := "disk" + strconv.Itoa(i+1) // Domainmgr sets devtype 9P for container images. Though in kubevirt container image is @@ -336,7 +359,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai }, } } else { - pvcName, err := ds.GetPVCNameFromVolumeKey() if err != nil { return logError("Failed to fetch PVC Name from volumekey %v", ds.VolumeKey) @@ -361,7 +383,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai }, } } - } vmi.Spec.Domain.Devices.Disks = disks[0:ndisks] vmi.Spec.Volumes = vols[0:ndisks] @@ -398,7 +419,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai tap := pciDevice{pciLong: ib.PciLong, ioType: ib.Type} pciAssignments = addNoDuplicatePCI(pciAssignments, tap) } - } } @@ -414,23 +434,58 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai } } + // Set the affinity to this node the VMI is preferred to run on + affinity := setKubeAffinity(nodeName) + + // Set tolerations to handle node conditions + tolerations := setKubeToleration(int64(tolerateSec)) + + vmi.Spec.Affinity = affinity + vmi.Spec.Tolerations = tolerations vmi.Labels = make(map[string]string) vmi.Labels[eveLabelKey] = domainName + // Create a VirtualMachineInstanceReplicaSet + replicaSet := &v1.VirtualMachineInstanceReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: kubeName, + Namespace: kubeapi.EVEKubeNameSpace, + }, + Spec: v1.VirtualMachineInstanceReplicaSetSpec{ + Replicas: pointer.Int32Ptr(1), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + eveLabelKey: domainName, + }, + }, + Template: &v1.VirtualMachineInstanceTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + eveLabelKey: domainName, + }, + }, + Spec: vmi.Spec, + }, + }, + } + + logrus.Infof("CreateReplicaVMIConfig: VirtualMachineInstanceReplicaSet: %+v", replicaSet) + // Now we have VirtualMachine Instance object, save it to config file for debug purposes // and save it in context which will be used to start VM in Start() call // dispName is for vmi name/handle on kubernetes meta := vmiMetaData{ - vmi: vmi, + repVMI: replicaSet, name: kubeName, + mtype: IsMetaReplicaVMI, domainID: int(rand.Uint32()), } ctx.vmiList[domainName] = &meta - vmiStr := fmt.Sprintf("%+v", vmi) + repvmiStr := fmt.Sprintf("%+v", replicaSet) // write to config file - file.WriteString(vmiStr) + file.WriteString(repvmiStr) return nil } @@ -438,56 +493,64 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai func (ctx kubevirtContext) Start(domainName string) error { logrus.Debugf("Starting Kubevirt domain %s", domainName) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Failed to get nodeName") + } + err := getConfig(&ctx) if err != nil { return err } kubeconfig := ctx.kubeConfig + logrus.Infof("Starting Kubevirt domain %s, devicename nodename %d", domainName, len(ctx.nodeNameMap)) vmis, ok := ctx.vmiList[domainName] if !ok { return logError("start domain %s failed to get vmlist", domainName) } - if vmis.isPod { - err := StartPodContainer(kubeconfig, ctx.vmiList[domainName].pod) + + // Start the Pod ReplicaSet + if vmis.mtype == IsMetaReplicaPod { + err := StartReplicaPodContiner(ctx, ctx.vmiList[domainName].repPod) return err + } else if vmis.mtype != IsMetaReplicaVMI { + return logError("Start domain %s wrong type", domainName) } - vmi := vmis.vmi + // Start the VMI ReplicaSet + repvmi := vmis.repVMI virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) - if err != nil { logrus.Errorf("couldn't get the kubernetes client API config: %v", err) return err } - // Create the VM + // Create the VMI ReplicaSet i := 5 for { - _, err = virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Create(context.Background(), vmi) + _, err = virtClient.ReplicaSet(kubeapi.EVEKubeNameSpace).Create(repvmi) if err != nil { if strings.Contains(err.Error(), "dial tcp 127.0.0.1:6443") && i <= 0 { - logrus.Infof("Start VM failed %v\n", err) + logrus.Infof("Start VMI replicaset failed %v\n", err) return err } time.Sleep(10 * time.Second) - logrus.Infof("Start VM failed, retry (%d) err %v", i, err) + logrus.Infof("Start VMI replicaset failed, retry (%d) err %v", i, err) } else { break } i = i - 1 } + logrus.Infof("Started Kubevirt domain replicaset %s, VMI replicaset %s", domainName, vmis.name) - logrus.Infof("Started Kubevirt domain %s", domainName) - - err = waitForVMI(vmis.name, true) + err = waitForVMI(vmis.name, nodeName, true) if err != nil { logrus.Errorf("couldn't start VMI %v", err) return err } return nil - } // Create is no-op for kubevirt, just return the domainID we already have. @@ -508,22 +571,16 @@ func (ctx kubevirtContext) Stop(domainName string, force bool) error { if !ok { return logError("domain %s failed to get vmlist", domainName) } - if vmis.isPod { - err := StopPodContainer(kubeconfig, vmis.name) - return err + if vmis.mtype == IsMetaReplicaPod { + err = StopReplicaPodContainer(kubeconfig, vmis.name) + } else if vmis.mtype == IsMetaReplicaVMI { + err = StopReplicaVMI(kubeconfig, vmis.name) } else { - virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) - if err != nil { - logrus.Errorf("couldn't get the kubernetes client API config: %v", err) - return err - } + return logError("Stop domain %s wrong type", domainName) + } - // Stop the VM - err = virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Delete(context.Background(), vmis.name, &metav1.DeleteOptions{}) - if err != nil { - logrus.Errorf("Stop error %v\n", err) - return err - } + if err != nil { + return err } delete(ctx.vmiList, domainName) @@ -545,27 +602,16 @@ func (ctx kubevirtContext) Delete(domainName string) (result error) { if !ok { return logError("delete domain %s failed to get vmlist", domainName) } - if vmis.isPod { - err := StopPodContainer(kubeconfig, vmis.name) - return err + if vmis.mtype == IsMetaReplicaPod { + err = StopReplicaPodContainer(kubeconfig, vmis.name) + } else if vmis.mtype == IsMetaReplicaVMI { + err = StopReplicaVMI(kubeconfig, vmis.name) } else { - virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) - - if err != nil { - logrus.Errorf("couldn't get the kubernetes client API config: %v", err) - return err - } - - // Stop the VM - err = virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Delete(context.Background(), vmis.name, &metav1.DeleteOptions{}) + return logError("delete domain %s wrong type", domainName) + } - // May be already deleted during Stop action, so its not an error if does not exist - if errors.IsNotFound(err) { - logrus.Infof("Domain already deleted: %v", domainName) - } else { - fmt.Printf("Delete error %v\n", err) - return err - } + if err != nil { + return err } // Delete the state dir @@ -584,9 +630,33 @@ func (ctx kubevirtContext) Delete(domainName string) (result error) { return nil } +// StopReplicaVMI stops the VMI ReplicaSet +func StopReplicaVMI(kubeconfig *rest.Config, repVmiName string) error { + virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) + if err != nil { + logrus.Errorf("couldn't get the kubernetes client API config: %v", err) + return err + } + + // Stop the VMI ReplicaSet + err = virtClient.ReplicaSet(kubeapi.EVEKubeNameSpace).Delete(repVmiName, &metav1.DeleteOptions{}) + if errors.IsNotFound(err) { + logrus.Infof("Stop VMI Replicaset, Domain already deleted: %v", repVmiName) + } else { + logrus.Errorf("Stop VMI Replicaset error %v\n", err) + return err + } + + return nil +} + func (ctx kubevirtContext) Info(domainName string) (int, types.SwState, error) { logrus.Debugf("Info called for Domain: %s", domainName) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return 0, types.BROKEN, logError("Failed to get nodeName") + } var res string var err error @@ -598,10 +668,10 @@ func (ctx kubevirtContext) Info(domainName string) (int, types.SwState, error) { if !ok { return 0, types.HALTED, logError("info domain %s failed to get vmlist", domainName) } - if vmis.isPod { - res, err = InfoPodContainer(ctx.kubeConfig, vmis.name) + if vmis.mtype == IsMetaReplicaPod { + res, err = InfoReplicaSetContainer(ctx, vmis.name) } else { - res, err = getVMIStatus(vmis.name) + res, err = getVMIStatus(vmis.name, nodeName) } if err != nil { return 0, types.BROKEN, logError("domain %s failed to get info: %v", domainName, err) @@ -622,15 +692,25 @@ func (ctx kubevirtContext) Cleanup(domainName string) error { if err := ctx.ctrdContext.Cleanup(domainName); err != nil { return fmt.Errorf("couldn't cleanup task %s: %v", domainName, err) } + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Cleanup: Failed to get nodeName") + } var err error vmis, ok := ctx.vmiList[domainName] if !ok { return logError("cleanup domain %s failed to get vmlist", domainName) } - if vmis.isPod { + if vmis.mtype == IsMetaReplicaPod { + _, err = InfoReplicaSetContainer(ctx, vmis.name) + if err == nil { + err = ctx.Delete(domainName) + } + } else if vmis.mtype == IsMetaReplicaVMI { + err = waitForVMI(vmis.name, nodeName, false) } else { - err = waitForVMI(vmis.name, false) + err = logError("cleanup domain %s wrong type", domainName) } if err != nil { return fmt.Errorf("waitforvmi failed %s: %v", domainName, err) @@ -655,7 +735,7 @@ func convertToKubernetesFormat(b int) string { return fmt.Sprintf("%.1fYi", bf) } -func getVMIStatus(vmiName string) (string, error) { +func getVMIStatus(repVmiName, nodeName string) (string, error) { kubeconfig, err := kubeapi.GetKubeConfig() if err != nil { @@ -668,20 +748,42 @@ func getVMIStatus(vmiName string) (string, error) { return "", logError("couldn't get the Kube client Config: %v", err) } - // Get the VMI info - vmi, err := virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Get(context.Background(), vmiName, &metav1.GetOptions{}) - + // List VMIs with a label selector that matches the replicaset name + vmiList, err := virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).List(context.Background(), &metav1.ListOptions{}) if err != nil { - return "", logError("domain %s failed to get VMI info %s", vmiName, err) + return "", logError("getVMIStatus: domain %s failed to get VMI info %s", repVmiName, err) + } + if len(vmiList.Items) == 0 { + return "", logError("getVMIStatus: No VMI found with the given replicaset name %s", repVmiName) } - res := fmt.Sprintf("%v", vmi.Status.Phase) - + // Use the first VMI in the list + var foundNonlocal bool + var targetVMI *v1.VirtualMachineInstance + for _, vmi := range vmiList.Items { + if vmi.Status.NodeName == nodeName { + if vmi.GenerateName == repVmiName { + targetVMI = &vmi + break + } + } else { + if vmi.GenerateName == repVmiName { + foundNonlocal = true + } + } + } + if targetVMI == nil { + if foundNonlocal { + return "NonLocal", nil + } + return "", logError("getVMIStatus: No VMI %s found with the given nodeName %s", repVmiName, nodeName) + } + res := fmt.Sprintf("%v", targetVMI.Status.Phase) return res, nil } // Inspired from kvm.go -func waitForVMI(vmiName string, available bool) error { +func waitForVMI(vmiName, nodeName string, available bool) error { maxDelay := time.Minute * 5 // 5mins ?? lets keep it for now delay := time.Second var waited time.Duration @@ -693,8 +795,7 @@ func waitForVMI(vmiName string, available bool) error { waited += delay } - state, err := getVMIStatus(vmiName) - + state, err := getVMIStatus(vmiName, nodeName) if err != nil { if available { @@ -728,17 +829,22 @@ func waitForVMI(vmiName string, available bool) error { func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error) { logrus.Debugf("GetDomsCPUMem: enter") + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return nil, nil + } res := make(kubevirtMetrics, len(ctx.vmiList)) - virtIP, err := getVirtHandlerIPAddr(&ctx) + virtIP, err := getVirtHandlerIPAddr(&ctx, nodeName) if err != nil { - logrus.Errorf("GetDomsCPUMem get virthandler ip error %v", err) + logrus.Debugf("GetDomsCPUMem get virthandler ip error %v", err) return nil, err } url := "https://" + virtIP + ":8443/metrics" httpClient := &http.Client{ Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + DisableKeepAlives: true, }, } @@ -765,6 +871,7 @@ func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error return nil, err } + // It seems the virt-handler metrics only container the VMIs running on this node scanner := bufio.NewScanner(strings.NewReader(string(body))) for scanner.Scan() { line := scanner.Text() @@ -809,7 +916,7 @@ func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error var domainName string for n, vmis := range ctx.vmiList { - if vmis.name == vmiName { + if strings.HasPrefix(vmiName, vmis.name) { // handle the VMI ReplicaSet domainName = n if _, ok := res[domainName]; !ok { res[domainName] = types.DomainMetric{ @@ -852,15 +959,13 @@ func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error } hasEmptyRes := len(ctx.vmiList) - len(res) - if hasEmptyRes > 0 { - // check and get the kubernetes pod's metrics - checkPodMetrics(ctx, res, hasEmptyRes) - } + checkReplicaPodMetrics(ctx, res, hasEmptyRes) + logrus.Debugf("GetDomsCPUMem: %d VMs: %+v, podnum %d", len(ctx.vmiList), res, hasEmptyRes) return res, nil } -func getVirtHandlerIPAddr(ctx *kubevirtContext) (string, error) { +func getVirtHandlerIPAddr(ctx *kubevirtContext, nodeName string) (string, error) { if ctx.virthandlerIPAddr != "" { return ctx.virthandlerIPAddr, nil } @@ -879,6 +984,9 @@ func getVirtHandlerIPAddr(ctx *kubevirtContext) (string, error) { var vmiPod *k8sv1.Pod for _, pod := range pods.Items { + if nodeName != pod.Spec.NodeName { + continue + } if strings.HasPrefix(pod.ObjectMeta.Name, "virt-handler-") { vmiPod = &pod break @@ -886,7 +994,7 @@ func getVirtHandlerIPAddr(ctx *kubevirtContext) (string, error) { } if vmiPod == nil { - return "", fmt.Errorf("can not find virt-handler pod") + return "", fmt.Errorf("getVirtHandlerIPAddr: can not find virt-handler pod") } ctx.virthandlerIPAddr = vmiPod.Status.PodIP return ctx.virthandlerIPAddr, nil @@ -919,17 +1027,23 @@ func assignToInt64(parsedValue interface{}) int64 { return intValue } -func (ctx kubevirtContext) CreatePodConfig(domainName string, config types.DomainConfig, status types.DomainStatus, +func (ctx kubevirtContext) CreateReplicaPodConfig(domainName string, config types.DomainConfig, status types.DomainStatus, diskStatusList []types.DiskStatus, aa *types.AssignableAdapters, file *os.File) error { kubeName := base.GetAppKubeName(config.DisplayName, config.UUIDandVersion.UUID) if config.KubeImageName == "" { err := fmt.Errorf("domain config kube image name empty") - logrus.Errorf("CreatePodConfig: %v", err) + logrus.Errorf("CreateReplicaPodConfig: %v", err) return err } ociName := config.KubeImageName + logrus.Infof("CreateReplicaPodConfig: domainName %s, kubeName %s, nodeName %d", domainName, kubeName, len(ctx.nodeNameMap)) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Failed to get nodeName") + } + var netSelections []netattdefv1.NetworkSelectionElement for _, vif := range config.VifList { netSelections = append(netSelections, netattdefv1.NetworkSelectionElement{ @@ -950,7 +1064,7 @@ func (ctx kubevirtContext) CreatePodConfig(domainName string, config types.Domai // Check if the NAD is created in the cluster, return error if not err := kubeapi.CheckEtherPassThroughNAD(nadName) if err != nil { - logrus.Errorf("CreatePodConfig: check ether NAD failed, %v", err) + logrus.Errorf("CreateReplicaPodConfig: check ether NAD failed, %v", err) return err } } @@ -961,131 +1075,219 @@ func (ctx kubevirtContext) CreatePodConfig(domainName string, config types.Domai annotations = map[string]string{ "k8s.v1.cni.cncf.io/networks": encodeSelections(netSelections), } - logrus.Infof("CreatePodConfig: annotations %+v", annotations) + logrus.Infof("CreateReplicaPodConfig: annotations %+v", annotations) } else { - err := fmt.Errorf("CreatePodConfig: no network selections, exit") + err := fmt.Errorf("CreateReplicaPodConfig: no network selections, exit") return err } - vcpus := strconv.Itoa(config.VCpus*1000) + "m" + //vcpus := strconv.Itoa(config.VCpus*1000) + "m" // FixedResources.Memory is in Kbytes - memoryLimit := strconv.Itoa(config.Memory * 1000) - memoryRequest := strconv.Itoa(config.Memory * 1000) + //memoryLimit := "100Mi" // convertToKubernetesFormat(config.Memory * 1000) + //memoryRequest := memoryLimit - pod := &k8sv1.Pod{ + var replicaNum int32 + replicaNum = 1 + repNum := &replicaNum + replicaSet := &appsv1.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ - Name: kubeName, - Namespace: kubeapi.EVEKubeNameSpace, - Annotations: annotations, + Name: kubeName, + Namespace: kubeapi.EVEKubeNameSpace, }, - Spec: k8sv1.PodSpec{ - Containers: []k8sv1.Container{ - { - Name: kubeName, - Image: ociName, - ImagePullPolicy: k8sv1.PullNever, - SecurityContext: &k8sv1.SecurityContext{ - Privileged: &[]bool{true}[0], + Spec: appsv1.ReplicaSetSpec{ + Replicas: repNum, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": kubeName, + }, + }, + Template: k8sv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": kubeName, }, - Resources: k8sv1.ResourceRequirements{ - Limits: k8sv1.ResourceList{ - k8sv1.ResourceCPU: resource.MustParse(vcpus), - k8sv1.ResourceMemory: resource.MustParse(memoryLimit), - }, - Requests: k8sv1.ResourceList{ - k8sv1.ResourceCPU: resource.MustParse(vcpus), - k8sv1.ResourceMemory: resource.MustParse(memoryRequest), + Annotations: annotations, + }, + Spec: k8sv1.PodSpec{ + Tolerations: setKubeToleration(int64(tolerateSec)), + Affinity: setKubeAffinity(nodeName), + Containers: []k8sv1.Container{ + { + Name: kubeName, + Image: ociName, + ImagePullPolicy: k8sv1.PullNever, + SecurityContext: &k8sv1.SecurityContext{ + Privileged: &[]bool{true}[0], + }, }, }, + RestartPolicy: k8sv1.RestartPolicyAlways, + DNSConfig: &k8sv1.PodDNSConfig{ + Nameservers: []string{"8.8.8.8", "1.1.1.1"}, // XXX, temp, Add your desired DNS servers here + }, }, }, - DNSConfig: &k8sv1.PodDNSConfig{ - Nameservers: []string{"8.8.8.8", "1.1.1.1"}, // XXX, temp, Add your desired DNS servers here - }, }, } - pod.Labels = make(map[string]string) - pod.Labels[eveLabelKey] = domainName - logrus.Infof("CreatePodConfig: pod setup %+v", pod) + logrus.Infof("CreateReplicaPodConfig: replicaset %+v", replicaSet) + + // Add pod non-image volume disks + if len(diskStatusList) > 1 { + leng := len(diskStatusList) - 1 + for _, ds := range diskStatusList[1:] { + if ds.Devtype == "9P" { // skip 9P volume type + if leng > 0 { + leng-- + } else { + break + } + } + } + if leng > 0 { + volumes := make([]k8sv1.Volume, leng) + mounts := make([]k8sv1.VolumeMount, leng) + + i := 0 + for _, ds := range diskStatusList[1:] { + if ds.Devtype == "9P" { + continue + } + voldispName := strings.ToLower(ds.DisplayName) + //voldevs[i] = k8sv1.VolumeDevice{ + // Name: voldispName, + // DevicePath: ds.MountDir, + //} + mounts[i] = k8sv1.VolumeMount{ + Name: voldispName, + MountPath: ds.MountDir, + } + + volumes[i].Name = voldispName + volumes[i].VolumeSource = k8sv1.VolumeSource{ + PersistentVolumeClaim: &k8sv1.PersistentVolumeClaimVolumeSource{ + ClaimName: strings.ToLower(ds.DisplayName), + //ClaimName: ds.VolumeKey, + }, + } + logrus.Infof("CreateReplicaPodConfig:(%d) mount[i] %+v, volumes[i] %+v", i, mounts[i], volumes[i]) + i++ + } + replicaSet.Spec.Template.Spec.Containers[0].VolumeMounts = mounts + replicaSet.Spec.Template.Spec.Volumes = volumes + } + } + logrus.Infof("CreateReplicaPodConfig: replicaset setup %+v", replicaSet) // Now we have VirtualMachine Instance object, save it to config file for debug purposes // and save it in context which will be used to start VM in Start() call meta := vmiMetaData{ - pod: pod, - isPod: true, + repPod: replicaSet, + mtype: IsMetaReplicaPod, name: kubeName, domainID: int(rand.Uint32()), } ctx.vmiList[domainName] = &meta - podStr := fmt.Sprintf("%+v", pod) + repStr := fmt.Sprintf("%+v", replicaSet) // write to config file - file.WriteString(podStr) + file.WriteString(repStr) return nil } -func encodeSelections(selections []netattdefv1.NetworkSelectionElement) string { - bytes, err := json.Marshal(selections) - if err != nil { - logrus.Errorf("encodeSelections %v", err) - return "" +func setKubeAffinity(nodeName string) *k8sv1.Affinity { + affinity := &k8sv1.Affinity{ + NodeAffinity: &k8sv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []k8sv1.PreferredSchedulingTerm{ + { + Preference: k8sv1.NodeSelectorTerm{ + MatchExpressions: []k8sv1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: "In", + Values: []string{nodeName}, + }, + }, + }, + Weight: 100, + }, + }, + }, } - return string(bytes) + return affinity } -// StartPodContainer : Starts container as kubernetes pod -func StartPodContainer(kubeconfig *rest.Config, pod *k8sv1.Pod) error { +func setKubeToleration(timeOutSec int64) []k8sv1.Toleration { + tolerations := []k8sv1.Toleration{ + { + Key: "node.kubernetes.io/unreachable", + Operator: k8sv1.TolerationOpExists, + Effect: k8sv1.TaintEffectNoExecute, + TolerationSeconds: pointer.Int64Ptr(timeOutSec), + }, + { + Key: "node.kubernetes.io/not-ready", + Operator: k8sv1.TolerationOpExists, + Effect: k8sv1.TaintEffectNoExecute, + TolerationSeconds: pointer.Int64Ptr(timeOutSec), + }, + } + return tolerations +} - clientset, err := kubernetes.NewForConfig(kubeconfig) +// StartReplicaPodContiner starts the ReplicaSet pod +func StartReplicaPodContiner(ctx kubevirtContext, rep *appsv1.ReplicaSet) error { + err := getConfig(&ctx) + if err != nil { + return err + } + clientset, err := kubernetes.NewForConfig(ctx.kubeConfig) if err != nil { - logrus.Errorf("StartPodContainer: can't get clientset %v", err) + logrus.Errorf("StartReplicaPodContiner: can't get clientset %v", err) return err } opStr := "created" - _, err = clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Create(context.TODO(), pod, metav1.CreateOptions{}) + result, err := clientset.AppsV1().ReplicaSets(kubeapi.EVEKubeNameSpace).Create(context.TODO(), rep, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { - // TODO: update - logrus.Errorf("StartPodContainer: pod create filed: %v", err) + logrus.Errorf("StartReplicaPodContiner: replicaset create failed: %v", err) return err } else { opStr = "already exists" } } - logrus.Infof("StartPodContainer: Pod %s %s with nad %+v", pod.ObjectMeta.Name, opStr, pod.Annotations) + logrus.Infof("StartReplicaPodContiner: Rep %s %s, result %v", rep.ObjectMeta.Name, opStr, result) - err = checkForPod(kubeconfig, pod.ObjectMeta.Name) + err = checkForReplicaPod(ctx, rep.ObjectMeta.Name) if err != nil { - logrus.Errorf("StartPodContainer: check for pod status error %v", err) + logrus.Errorf("StartReplicaPodContiner: check for pod status error %v", err) return err } - logrus.Infof("StartPodContainer: Pod %s running", pod.ObjectMeta.Name) + logrus.Infof("StartReplicaPodContiner: Pod %s running", rep.ObjectMeta.Name) return nil } -func checkForPod(kubeconfig *rest.Config, podName string) error { +func checkForReplicaPod(ctx kubevirtContext, repName string) error { var i int var status string var err error - // wait for pod to be in running state, sometimes can take long, but we only wait for - // about a minute in order not to cause watchdog action for { i++ - logrus.Infof("checkForPod: check(%d) wait 15 sec, %v", i, podName) - time.Sleep(waitForPodCheckTime * time.Second) + logrus.Infof("checkForReplicaPod: check(%d) wait 15 sec, %v", i, repName) + time.Sleep(15 * time.Second) - status, err = InfoPodContainer(kubeconfig, podName) + status, err = InfoReplicaSetContainer(ctx, repName) if err != nil { - logrus.Infof("checkForPod: podName %s, %v", podName, err) + logrus.Infof("checkForReplicaPod: repName %s, %v", repName, err) } else { - if status == "Running" { + if status == "Running" || status == "NonLocal" { + logrus.Infof("checkForReplicaPod: (%d) status %s, good", i, status) return nil } else { - logrus.Errorf("checkForPod: get podName info status %v (not running)", status) + logrus.Errorf("checkForReplicaPod(%d): get podName info status %v (not running)", i, status) } } if i > waitForPodCheckCounter { @@ -1093,64 +1295,70 @@ func checkForPod(kubeconfig *rest.Config, podName string) error { } } - return fmt.Errorf("checkForPod: timed out, statuus %s, err %v", status, err) + return fmt.Errorf("checkForReplicaPod: timed out, statuus %s, err %v", status, err) } -// StopPodContainer : Stops the running kubernetes pod -func StopPodContainer(kubeconfig *rest.Config, podName string) error { +// InfoReplicaSetContainer gets the status of the ReplicaSet pod +func InfoReplicaSetContainer(ctx kubevirtContext, repName string) (string, error) { - clientset, err := kubernetes.NewForConfig(kubeconfig) + err := getConfig(&ctx) if err != nil { - logrus.Errorf("StopPodContainer: can't get clientset %v", err) - return err + return "", err } - - err = clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + podclientset, err := kubernetes.NewForConfig(ctx.kubeConfig) if err != nil { - // Handle error - logrus.Errorf("StopPodContainer: deleting pod: %v", err) - return err + return "", logError("InfoReplicaSetContainer: couldn't get the pod Config: %v", err) } - logrus.Infof("StopPodContainer: Pod %s deleted", podName) - return nil -} - -// InfoPodContainer : Get the pod information -func InfoPodContainer(kubeconfig *rest.Config, podName string) (string, error) { - - podclientset, err := kubernetes.NewForConfig(kubeconfig) - if err != nil { - return "", logError("InfoPodContainer: couldn't get the pod Config: %v", err) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return "", logError("Failed to get nodeName") } - - pod, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + pods, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", repName), + }) if err != nil { - return "", logError("InfoPodContainer: couldn't get the pod: %v", err) + return "", logError("InfoReplicaSetContainer: couldn't get the pods: %v", err) } - var res string - // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ - switch pod.Status.Phase { - case k8sv1.PodPending: - res = "Pending" - case k8sv1.PodRunning: - res = "Running" - case k8sv1.PodSucceeded: - res = "Running" - case k8sv1.PodFailed: - res = "Failed" - case k8sv1.PodUnknown: - res = "Scheduling" - default: - res = "Scheduling" - } - logrus.Infof("InfoPodContainer: pod %s, status %s", podName, res) + var foundNonlocal bool + for _, pod := range pods.Items { + if nodeName != pod.Spec.NodeName { + foundNonlocal = true + logrus.Infof("InfoReplicaSetContainer: rep %s, nodeName %v differ w/ hostname", repName, pod.Spec.NodeName) + continue + } - return res, nil + var res string + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + switch pod.Status.Phase { + case k8sv1.PodPending: + res = "Pending" + case k8sv1.PodRunning: + res = "Running" + case k8sv1.PodSucceeded: + res = "Running" + case k8sv1.PodFailed: + res = "Failed" + case k8sv1.PodUnknown: + res = "Scheduling" + default: + res = "Scheduling" + } + logrus.Infof("InfoReplicaSetContainer: rep %s, nodeName %v, status %s", pod.ObjectMeta.Name, pod.Spec.NodeName, res) + if pod.Status.Phase != k8sv1.PodRunning { + continue + } + + return res, nil + } + if foundNonlocal { + return "NonLocal", nil + } + return "", logError("InfoReplicaSetContainer: pod not ready") } -func checkPodMetrics(ctx kubevirtContext, res map[string]types.DomainMetric, emptySlot int) { +func checkReplicaPodMetrics(ctx kubevirtContext, res map[string]types.DomainMetric, emptySlot int) { err := getConfig(&ctx) if err != nil { @@ -1159,74 +1367,187 @@ func checkPodMetrics(ctx kubevirtContext, res map[string]types.DomainMetric, emp kubeconfig := ctx.kubeConfig podclientset, err := kubernetes.NewForConfig(kubeconfig) if err != nil { - logrus.Errorf("checkPodMetrics: can not get pod client %v", err) + logrus.Errorf("checkReplicaPodMetrics: can not get pod client %v", err) return } clientset, err := metricsv.NewForConfig(kubeconfig) if err != nil { - logrus.Errorf("checkPodMetrics: can't get clientset %v", err) + logrus.Errorf("checkReplicaPodMetrics: can't get clientset %v", err) + return + } + + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + logrus.Errorf("checkReplicaPodMetrics: can't get node name") // XXX may remove return } count := 0 for n, vmis := range ctx.vmiList { - if !vmis.isPod { + if vmis.mtype != IsMetaReplicaPod { continue } count++ - podName := vmis.name - pod, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + repName := vmis.name + pods, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", repName), + }) if err != nil { - logrus.Errorf("checkPodMetrics: can't get pod %v", err) + logrus.Errorf("checkReplicaPodMetrics: can't get pod %v", err) continue } - memoryLimits := pod.Spec.Containers[0].Resources.Limits.Memory() - metrics, err := clientset.MetricsV1beta1().PodMetricses(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - logrus.Errorf("checkPodMetrics: get pod metrics error %v", err) - continue + for _, pod := range pods.Items { + dm := getPodMetrics(clientset, pod, vmis, nodeName, res) + if dm != nil { + if count <= emptySlot { + res[n] = *dm + } + logrus.Infof("checkReplicaPodMetrics: dm %+v, res %v", dm, res) + + ctx.vmiList[n] = vmis // update for the last seen metrics + } } + } - cpuTotalNs := metrics.Containers[0].Usage[k8sv1.ResourceCPU] - cpuTotalNsAsFloat64 := cpuTotalNs.AsApproximateFloat64() * float64(time.Second) // get nanoseconds - totalCPU := uint64(cpuTotalNsAsFloat64) + logrus.Infof("checkReplicaPodMetrics: done with vmiList") +} + +func getPodMetrics(clientset *metricsv.Clientset, pod k8sv1.Pod, vmis *vmiMetaData, + nodeName string, res map[string]types.DomainMetric) *types.DomainMetric { + if pod.Status.Phase != k8sv1.PodRunning { + return nil + } + if nodeName != pod.Spec.NodeName { // cluster, pod from other nodes + return nil + } + podName := pod.ObjectMeta.Name + memoryLimits := pod.Spec.Containers[0].Resources.Limits.Memory() - //allocatedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] - usedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] - maxMemory := uint32(usedMemory.Value()) + metrics, err := clientset.MetricsV1beta1().PodMetricses(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("getPodMetrics: get pod metrics error %v", err) + return nil + } + + cpuTotalNs := metrics.Containers[0].Usage[k8sv1.ResourceCPU] + cpuTotalNsAsFloat64 := cpuTotalNs.AsApproximateFloat64() * float64(time.Second) // get nanoseconds + totalCPU := uint64(cpuTotalNsAsFloat64) + + //allocatedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] + usedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] + maxMemory := uint32(usedMemory.Value()) + if vmis != nil { if vmis.maxmem < maxMemory { vmis.maxmem = maxMemory } else { maxMemory = vmis.maxmem } + } - available := uint32(memoryLimits.Value()) - if uint32(usedMemory.Value()) < available { - available = available - uint32(usedMemory.Value()) - } - usedMemoryPercent := calculateMemoryUsagePercent(usedMemory.Value(), memoryLimits.Value()) - BytesInMegabyte := uint32(1024 * 1024) + available := uint32(memoryLimits.Value()) + if uint32(usedMemory.Value()) < available { + available = available - uint32(usedMemory.Value()) + } + usedMemoryPercent := calculateMemoryUsagePercent(usedMemory.Value(), memoryLimits.Value()) + BytesInMegabyte := uint32(1024 * 1024) - realCPUTotal := vmis.cputotal + totalCPU + var realCPUTotal uint64 + if vmis != nil { + realCPUTotal = vmis.cputotal + totalCPU vmis.cputotal = realCPUTotal - dm := types.DomainMetric{ - CPUTotalNs: realCPUTotal, - CPUScaled: 1, - AllocatedMB: uint32(memoryLimits.Value()) / BytesInMegabyte, - UsedMemory: uint32(usedMemory.Value()) / BytesInMegabyte, - MaxUsedMemory: maxMemory / BytesInMegabyte, - AvailableMemory: available / BytesInMegabyte, - UsedMemoryPercent: usedMemoryPercent, - } - if count <= emptySlot { - res[n] = dm - } - logrus.Infof("checkPodMetrics: dm %+v, res %v", dm, res) + } + dm := &types.DomainMetric{ + CPUTotalNs: realCPUTotal, + CPUScaled: 1, + AllocatedMB: uint32(memoryLimits.Value()) / BytesInMegabyte, + UsedMemory: uint32(usedMemory.Value()) / BytesInMegabyte, + MaxUsedMemory: maxMemory / BytesInMegabyte, + AvailableMemory: available / BytesInMegabyte, + UsedMemoryPercent: usedMemoryPercent, + NodeName: pod.Spec.NodeName, + } + logrus.Infof("getPodMetrics: dm %+v, res %v", dm, res) + return dm +} + +// StopReplicaPodContainer stops the ReplicaSet pod +func StopReplicaPodContainer(kubeconfig *rest.Config, repName string) error { + + clientset, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + logrus.Errorf("StopReplicaPodContainer: can't get clientset %v", err) + return err + } + + err = clientset.AppsV1().ReplicaSets(kubeapi.EVEKubeNameSpace).Delete(context.TODO(), repName, metav1.DeleteOptions{}) + if err != nil { + // Handle error + logrus.Errorf("StopReplicaPodContainer: deleting pod: %v", err) + return err + } + + logrus.Infof("StopReplicaPodContainer: Pod %s deleted", repName) + return nil +} - ctx.vmiList[n] = vmis // update for the last seen metrics +func encodeSelections(selections []netattdefv1.NetworkSelectionElement) string { + bytes, err := json.Marshal(selections) + if err != nil { + logrus.Errorf("encodeSelections %v", err) + return "" } + return string(bytes) +} + +// InfoPodContainer : Get the pod information +func InfoPodContainer(ctx kubevirtContext, podName string) (string, error) { + err := getConfig(&ctx) + if err != nil { + return "", err + } + podclientset, err := kubernetes.NewForConfig(ctx.kubeConfig) + if err != nil { + return "", logError("InfoPodContainer: couldn't get the pod Config: %v", err) + } + + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return "", logError("Failed to get nodeName") + } + + pod, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return "", logError("InfoPodContainer: couldn't get the pod: %v", err) + } + + if nodeName != pod.Spec.NodeName { + logrus.Infof("InfoPodContainer: pod %s, nodeName %v differ w/ hostname", podName, pod.Spec.NodeName) + return "", nil + } else { + logrus.Infof("InfoPodContainer: pod %s, nodeName %v, matches the hostname uuid", podName, pod.Spec.NodeName) + } + + var res string + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + switch pod.Status.Phase { + case k8sv1.PodPending: + res = "Pending" + case k8sv1.PodRunning: + res = "Running" + case k8sv1.PodSucceeded: + res = "Running" + case k8sv1.PodFailed: + res = "Failed" + case k8sv1.PodUnknown: + res = "Scheduling" + default: + res = "Scheduling" + } + logrus.Infof("InfoPodContainer: pod %s, nodeName %v, status %s", podName, pod.Spec.NodeName, res) + + return res, nil } // Helper function to calculate the memory usage percentage @@ -1378,3 +1699,9 @@ func (ctx kubevirtContext) VirtualTPMTerminate(domainName string, wp *types.Watc func (ctx kubevirtContext) VirtualTPMTeardown(domainName string, wp *types.WatchdogParam) error { return fmt.Errorf("not implemented") } + +func getMyNodeUUID(ctx *kubevirtContext, nodeName string) { + if len(ctx.nodeNameMap) == 0 { + ctx.nodeNameMap["nodename"] = nodeName + } +} diff --git a/pkg/pillar/kubeapi/kubeapi.go b/pkg/pillar/kubeapi/kubeapi.go index fecfb10031..3fea0de1c3 100644 --- a/pkg/pillar/kubeapi/kubeapi.go +++ b/pkg/pillar/kubeapi/kubeapi.go @@ -282,8 +282,8 @@ func waitForNodeReady(client *kubernetes.Clientset, readyCh chan bool, devUUID s if err != nil { return err } - if len(pods.Items) < 6 { - return fmt.Errorf("kubevirt running pods less than 6") + if len(pods.Items) < 4 { // need at least 4 pods to be running + return fmt.Errorf("kubevirt running pods less than 4") } err = waitForLonghornReady(client, hostname) diff --git a/pkg/pillar/types/clustertypes.go b/pkg/pillar/types/clustertypes.go index f3a00af8e7..31794adc19 100644 --- a/pkg/pillar/types/clustertypes.go +++ b/pkg/pillar/types/clustertypes.go @@ -49,6 +49,8 @@ type ENClusterAppStatus struct { ScheduledOnThisNode bool // App is running on this device StatusRunning bool // Status of the app in "Running" state IsVolumeDetached bool // Are volumes detached after failover ? + AppIsVMI bool // Is this a VMI app, vs a Pod app + AppKubeName string // Kube name of the app, either VMI or Pod } // Key - returns the key for the config of EdgeNodeClusterConfig diff --git a/pkg/pillar/types/domainmgrtypes.go b/pkg/pillar/types/domainmgrtypes.go index cdcb5c3d9f..a4f0dfb146 100644 --- a/pkg/pillar/types/domainmgrtypes.go +++ b/pkg/pillar/types/domainmgrtypes.go @@ -327,6 +327,11 @@ type DomainStatus struct { FmlCustomResolution string // if this node is the DNiD of the App IsDNidNode bool + // handle DomainConfig Delete + // for kubevirt EVE, App is configured into the kubernetes database, + // there is no need to delete the domain if the status check fails. + // But this flag is used to handle if the domain config is deleted. + DomainConfigDeleted bool // the device name is used for kube node name // Need to pass in from domainmgr to hypervisor context commands NodeName string diff --git a/pkg/pillar/types/zedmanagertypes.go b/pkg/pillar/types/zedmanagertypes.go index fb46b5aacb..5901f2a5ed 100644 --- a/pkg/pillar/types/zedmanagertypes.go +++ b/pkg/pillar/types/zedmanagertypes.go @@ -144,8 +144,7 @@ type AppInstanceConfig struct { // allow AppInstance to discover other AppInstances attached to its network instances AllowToDiscover bool - // XXX Cluster Designated Node Id - // temp, this will be changed to bool in later PR + // Cluster Designated Node Id DesignatedNodeID uuid.UUID }