Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Clustering and App related functions #4407

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions pkg/pillar/base/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,37 @@ func GetAppKubeName(displayName string, uuid uuid.UUID) string {
}

// GetVMINameFromVirtLauncher : get VMI name from the corresponding Kubevirt
// launcher pod name.
// launcher pod name for replicaset generated VMI.
func GetVMINameFromVirtLauncher(podName string) (vmiName string, isVirtLauncher bool) {
if !strings.HasPrefix(podName, VMIPodNamePrefix) {
return "", false
}
vmiName = strings.TrimPrefix(podName, VMIPodNamePrefix)
lastSep := strings.LastIndex(vmiName, "-")
if lastSep != -1 {
vmiName = vmiName[:lastSep]
if lastSep == -1 || lastSep < 5 {
return "", false
}

// Check if the last part is 5 bytes long
if len(vmiName[lastSep+1:]) != 5 {
return "", false
}

// Use the index minus 5 bytes to get the VMI name to remove added
// replicaset suffix
vmiName = vmiName[:lastSep-5]
return vmiName, true
}

// GetReplicaPodName : get the app name from the pod name for replica pods.
func GetReplicaPodName(displayName, podName string, uuid uuid.UUID) (kubeName string, isReplicaPod bool) {
kubeName = GetAppKubeName(displayName, uuid)
if !strings.HasPrefix(podName, kubeName) {
return "", false
}
suffix := strings.TrimPrefix(podName, kubeName)
if strings.HasPrefix(suffix, "-") && len(suffix[1:]) == 5 {
return kubeName, true
}
return "", false
}
1 change: 1 addition & 0 deletions pkg/pillar/cipher/cipher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func getEncryptionBlock(
decBlock.CellularNetUsername = zconfigDecBlockPtr.CellularNetUsername
decBlock.CellularNetPassword = zconfigDecBlockPtr.CellularNetPassword
decBlock.ProtectedUserData = zconfigDecBlockPtr.ProtectedUserData
decBlock.ClusterToken = zconfigDecBlockPtr.ClusterToken
return decBlock
}

Expand Down
81 changes: 71 additions & 10 deletions pkg/pillar/cmd/domainmgr/domainmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type domainContext struct {
pubDomainStatus pubsub.Publication
subGlobalConfig pubsub.Subscription
subZFSPoolStatus pubsub.Subscription
subEdgeNodeInfo pubsub.Subscription
pubAssignableAdapters pubsub.Publication
pubDomainMetric pubsub.Publication
pubHostMemory pubsub.Publication
Expand Down Expand Up @@ -126,6 +127,7 @@ type domainContext struct {
cpuPinningSupported bool
// Is it kubevirt eve
hvTypeKube bool
nodeName string
}

// AddAgentSpecificCLIFlags adds CLI options
Expand Down Expand Up @@ -414,9 +416,24 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
domainCtx.subZFSPoolStatus = subZFSPoolStatus
subZFSPoolStatus.Activate()

// Look for edge node info
subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{
AgentName: "zedagent",
MyAgentName: agentName,
TopicImpl: types.EdgeNodeInfo{},
Persistent: true,
Activate: false,
})
if err != nil {
log.Fatal(err)
}
domainCtx.subEdgeNodeInfo = subEdgeNodeInfo
_ = subEdgeNodeInfo.Activate()

// Parse any existing ConfigIntemValueMap but continue if there
// is none
for !domainCtx.GCComplete {
waitEdgeNodeInfo := true
for !domainCtx.GCComplete || (domainCtx.hvTypeKube && waitEdgeNodeInfo) {
log.Noticef("waiting for GCComplete")
select {
case change := <-subGlobalConfig.MsgChan():
Expand All @@ -425,12 +442,22 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case <-domainCtx.publishTicker.C:
publishProcessesHandler(&domainCtx)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)
waitEdgeNodeInfo = false

case <-stillRunning.C:
}
ps.StillRunning(agentName, warningTime, errorTime)
}
log.Noticef("processed GCComplete")

// Get the EdgeNode info, needed for kubevirt clustering
err = domainCtx.retrieveDeviceNodeName()
if err != nil {
log.Fatal(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't fatal here since you only wait if hvTypeKube.
How about making Eden/Adam send the EdgeNodeInfo and wait even if not hvTypeKube?
That would remove some special cases like this issue.

}

if !domainCtx.setInitialUsbAccess {
log.Functionf("GCComplete but not setInitialUsbAccess => first boot")
// Enable USB keyboard and storage
Expand Down Expand Up @@ -513,6 +540,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-subZFSPoolStatus.MsgChan():
subZFSPoolStatus.ProcessChange(change)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)

case <-domainCtx.publishTicker.C:
publishProcessesHandler(&domainCtx)

Expand Down Expand Up @@ -651,6 +681,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
case change := <-subPhysicalIOAdapter.MsgChan():
subPhysicalIOAdapter.ProcessChange(change)

case change := <-subEdgeNodeInfo.MsgChan():
subEdgeNodeInfo.ProcessChange(change)

case <-domainCtx.publishTicker.C:
start := time.Now()
err = domainCtx.cipherMetrics.Publish(log, cipherMetricsPub, "global")
Expand Down Expand Up @@ -977,12 +1010,15 @@ func verifyStatus(ctx *domainContext, status *types.DomainStatus) {
status.SetErrorDescription(errDescription)
}

//cleanup app instance tasks
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
}
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil {
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err)
// in cluster mode, we can not delete the pod due to failing to get app info
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the issue this is fixing something appearing when there is a failover/takeover and another node in the cluster starts running the app instance?
Or is it something which could happen when an app instance is first provisioned on the cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen even on the first node of the app deployment, sometimes we can not get the status from the k3s cluster, or somethings it takes time to come up running state, but we should not remove this kubernetes configuration, it has the config stored in the database, it has it's own scheduling and control process to eventually bring it to the intended state. If we delete the config from the cluster, then we need to wait for another 10 minutes to retry, etc. and it will cause confusing.

Copy link
Contributor Author

@naiming-zededa naiming-zededa Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, a new boolean is introduced in the domainstatus, DomainConfigDeleted, allow the Designated node, if it knows for sure the app instance is removed from the device, then it can go ahead to delete the app/domain from the cluster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to capture the above explanation either in a comment here or in pkg/pillar/docs/zedkube.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I documented this in zedkube.md, and referenced from domainmgr.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - re-reading it and it still looks odd.
Does the kubevirt Info() return an error or does it return HALTED when the issue is merely that it can't (yet) fetch the status from k3s?
Can we not fix that Info() to return something more accurate?

If it returns an error or HALTED then this function will set an error, and that error might be propagated to the controller, which would be confusing if the task is slow at starting, or is starting on some other node.

So I think this check is in the wrong place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current code in kubevirt.go logic is that

  1. if not found this app, then it will return status "", and an error logError("getVMIStatus: No VMI %s found with the given nodeName %s", repVmiName, nodeName)
  2. if found this app on another node in cluster, then it returns status "nolocal", no error
  3. if found on this node, then return whatever the kubernetes app running status

with above condition, if error is returned, then status is set to types.Broken
we further map the above status in string with a mapping:

// Use few states  for now
var stateMap = map[string]types.SwState{
    "Paused":     types.PAUSED,
    "Running":    types.RUNNING,
    "NonLocal":   types.RUNNING,
    "shutdown":   types.HALTING,
    "suspended":  types.PAUSED,
    "Pending":    types.PENDING,
    "Scheduling": types.SCHEDULING,
    "Failed":     types.FAILED,
}

this is a good point of the error condition if not running will be confusing. I can change the condition 1) above to 'Scheduling', and it's a currently defined state, and sort of reflecting the kubernetes app status.

if !ctx.hvTypeKube {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I get the feeling that most or all of these hvTypeKube checks should not be in domainmgr but he inside the kubevirt hypervisor package.

If for instance zedmanager is telling domainmgr to shut down or delete a task and if for kubevirt this isn't needed except on some designated node, can't that check whether designated or not be done inside the hypervisor/kubevirt code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move this into the hypervisor functions, and I think mainly we need to pass in if this Domain config is being deleted or not (DomainStatus.DomainConfigDeleted), so, we need to change the API from '.Delete(domainName string)' into '.Delete(status *types.DomainStatus)' (that is going to change for other hypervisors)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you move it there? If so these this change and the preceeding comment can be removed from verifyStatus(), right?

//cleanup app instance tasks
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
}
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil {
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err)
}
}
}
status.DomainId = 0
Expand Down Expand Up @@ -1071,6 +1107,7 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) {
if !status.BootFailed {
return
}

if status.Activated && status.BootFailed {
log.Functionf("maybeRetryBoot(%s) clearing bootFailed since Activated",
status.Key())
Expand Down Expand Up @@ -1138,6 +1175,11 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) {
log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err)
}

// pass nodeName to hypervisor call Setup
if status.NodeName == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't NodeName be set when DomainStatus is created in handleCreate?

status.NodeName = ctx.nodeName
}

if err := hyper.Task(status).Setup(*status, *config, ctx.assignableAdapters, nil, file); err != nil {
//it is retry, so omit error
log.Errorf("Failed to create DomainStatus from %+v: %s",
Expand Down Expand Up @@ -1684,6 +1726,11 @@ func doActivate(ctx *domainContext, config types.DomainConfig,
log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err)
}

// pass nodeName to hypervisor call Setup
if status.NodeName == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

status.NodeName = ctx.nodeName
}

globalConfig := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig)
if err := hyper.Task(status).Setup(*status, config, ctx.assignableAdapters, globalConfig, file); err != nil {
log.Errorf("Failed to create DomainStatus from %+v: %s",
Expand Down Expand Up @@ -1780,6 +1827,7 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus,
status.SetErrorNow(err.Error())
log.Errorf("doActivateTail(%v) failed for %s: %s",
status.UUIDandVersion, status.DisplayName, err)

// Delete
if err := hyper.Task(status).Delete(status.DomainName); err != nil {
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err)
Expand Down Expand Up @@ -1844,7 +1892,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
if doShutdown {
// If the Shutdown fails we don't wait; assume failure
// was due to no PV tools
if err := DomainShutdown(*status, false); err != nil {
if err := DomainShutdown(ctx, *status, false); err != nil {
log.Errorf("DomainShutdown %s failed: %s",
status.DomainName, err)
} else {
Expand All @@ -1864,7 +1912,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool
// the domain is already on the way down.
// In case of errors we proceed directly to deleting the task,
// and after that we waitForDomainGone
if err := DomainShutdown(*status, true); err != nil {
if err := DomainShutdown(ctx, *status, true); err != nil {
log.Warnf("DomainShutdown -F %s failed: %s",
status.DomainName, err)
} else {
Expand Down Expand Up @@ -2508,13 +2556,14 @@ func DomainCreate(ctx *domainContext, status types.DomainStatus) (int, error) {
}

// DomainShutdown is a wrapper for domain shutdown
func DomainShutdown(status types.DomainStatus, force bool) error {
func DomainShutdown(ctx *domainContext, status types.DomainStatus, force bool) error {

var err error
log.Functionf("DomainShutdown force-%v %s %d", force, status.DomainName, status.DomainId)

// Stop the domain
log.Functionf("Stopping domain - %s", status.DomainName)

err = hyper.Task(&status).Stop(status.DomainName, force)

return err
Expand Down Expand Up @@ -3603,3 +3652,15 @@ func lookupCapabilities(ctx *domainContext) (*types.Capabilities, error) {
}
return &capabilities, nil
}

func (ctx *domainContext) retrieveDeviceNodeName() error {
NodeInfo, err := ctx.subEdgeNodeInfo.Get("global")
if err != nil {
log.Errorf("retrieveDeviceNodeName: can't get edgeNodeInfo %v", err)
return err
}
enInfo := NodeInfo.(types.EdgeNodeInfo)
ctx.nodeName = strings.ReplaceAll(strings.ToLower(enInfo.DeviceName), "_", "-")
log.Noticef("retrieveDeviceNodeName: devicename, NodeInfo %v", NodeInfo) // XXX
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the XXX comment?

return nil
}
57 changes: 57 additions & 0 deletions pkg/pillar/cmd/zedagent/parseconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func parseConfig(getconfigCtx *getconfigContext, config *zconfig.EdgeDevConfig,

if source != fromBootstrap {
activateNewBaseOS := parseBaseOS(getconfigCtx, config)
parseEdgeNodeClusterConfig(getconfigCtx, config)

// Parse EdgeNode Cluster configuration

parseNetworkInstanceConfig(getconfigCtx, config)
parseContentInfoConfig(getconfigCtx, config)
parseVolumeConfig(getconfigCtx, config)
Expand Down Expand Up @@ -764,6 +768,10 @@ func parseAppInstanceConfig(getconfigCtx *getconfigContext,
// Add config submitted via local profile server.
addLocalAppConfig(getconfigCtx, &appInstance)

// XXX add Designated ID to the appInstance
// XXX Keep this here for now to allow the kubevirt single-node working, the later PR to EVE main will remove this
appInstance.DesignatedNodeID = devUUID

// Verify that it fits and if not publish with error
checkAndPublishAppInstanceConfig(getconfigCtx, appInstance)
}
Expand Down Expand Up @@ -3199,3 +3207,52 @@ func handleDeviceOperation(ctxPtr *zedagentContext, op types.DeviceOperation) {
shutdownAppsGlobal(ctxPtr)
// nothing else to be done
}

func parseEdgeNodeClusterConfig(getconfigCtx *getconfigContext,
config *zconfig.EdgeDevConfig) {

ctx := getconfigCtx.zedagentCtx
zcfgCluster := config.GetCluster()
if zcfgCluster == nil {
log.Functionf("parseEdgeNodeClusterConfig: Unpublishing EdgeNodeClusterConfig")
ctx.pubEdgeNodeClusterConfig.Unpublish("global")
return
}
ipAddr, ipNet, err := net.ParseCIDR(zcfgCluster.GetClusterIpPrefix())
if err != nil {
log.Errorf("parseEdgeNodeClusterConfig: ParseCIDR failed %s", err)
return
}
ipNet.IP = ipAddr

joinServerIP := net.ParseIP(zcfgCluster.GetJoinServerIp())
naiming-zededa marked this conversation as resolved.
Show resolved Hide resolved
if joinServerIP == nil {
log.Errorf("handleEdgeNodeConfigItem: parse JoinServerIP failed")
return
}
var isJoinNode bool
// deduce the bootstrap node status from clusterIPPrefix and joinServerIP
if ipAddr.Equal(joinServerIP) { // deduce the bootstrap node status from
isJoinNode = true
}

id, err := uuid.FromString(zcfgCluster.GetClusterId())
if err != nil {
log.Errorf("parseEdgeNodeClusterConfig: failed to parse UUID: %v", err)
return
}
enClusterConfig := types.EdgeNodeClusterConfig{
ClusterName: zcfgCluster.GetClusterName(),
ClusterID: types.UUIDandVersion{UUID: id},
ClusterInterface: zcfgCluster.GetClusterInterface(),
ClusterIPPrefix: ipNet,
IsWorkerNode: zcfgCluster.GetIsWorkerNode(),
JoinServerIP: joinServerIP,
BootstrapNode: isJoinNode,
// XXX EncryptedClusterToken is only for gcp config
}
enClusterConfig.CipherToken = parseCipherBlock(getconfigCtx,
enClusterConfig.Key(), zcfgCluster.GetEncryptedClusterToken())
log.Functionf("parseEdgeNodeClusterConfig: ENCluster API, Config %+v, %v", zcfgCluster, enClusterConfig)
ctx.pubEdgeNodeClusterConfig.Publish("global", enClusterConfig)
}
13 changes: 13 additions & 0 deletions pkg/pillar/cmd/zedagent/zedagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ type zedagentContext struct {
// Is Kubevirt eve
hvTypeKube bool

// EN cluster config
pubEdgeNodeClusterConfig pubsub.Publication

// Netdump
netDumper *netdump.NetDumper // nil if netdump is disabled
netdumpInterval time.Duration
Expand Down Expand Up @@ -1103,6 +1106,16 @@ func initPublications(zedagentCtx *zedagentContext) {
}
getconfigCtx.pubZedAgentStatus.ClearRestarted()

zedagentCtx.pubEdgeNodeClusterConfig, err = ps.NewPublication(pubsub.PublicationOptions{
AgentName: agentName,
Persistent: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be persistent? That makes it much more painful to add fields to the EdgeNodeClusterConfig in future EVE releases.

TopicType: types.EdgeNodeClusterConfig{},
})
if err != nil {
log.Fatal(err)
}
zedagentCtx.pubEdgeNodeClusterConfig.ClearRestarted()

getconfigCtx.pubPhysicalIOAdapters, err = ps.NewPublication(pubsub.PublicationOptions{
AgentName: agentName,
TopicType: types.PhysicalIOAdapterList{},
Expand Down
6 changes: 3 additions & 3 deletions pkg/pillar/cmd/zedkube/applogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (z *zedkube) checkAppsStatus() {
}

pub := z.pubENClusterAppStatus
stItmes := pub.GetAll()
stItems := pub.GetAll()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update this function from the POC branch, it changed a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zedi-pramodh as discussed, since some of the types boolean changes, update the poc code to applog.go would need to update many other files. I'll leave to your later PR to add those changes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok no problem, I will take care of those and submit my PR once this one is merged.

var oldStatus *types.ENClusterAppStatus
for _, item := range items {
aiconfig := item.(types.AppInstanceConfig)
Expand All @@ -179,7 +179,7 @@ func (z *zedkube) checkAppsStatus() {
}
}

for _, st := range stItmes {
for _, st := range stItems {
aiStatus := st.(types.ENClusterAppStatus)
if aiStatus.AppUUID == aiconfig.UUIDandVersion.UUID {
oldStatus = &aiStatus
Expand All @@ -204,7 +204,7 @@ func (z *zedkube) getnodeNameAndUUID() error {
return err
}
enInfo := NodeInfo.(types.EdgeNodeInfo)
z.nodeName = strings.ToLower(enInfo.DeviceName)
z.nodeName = strings.ReplaceAll(strings.ToLower(enInfo.DeviceName), "_", "-")
z.nodeuuid = enInfo.DeviceID.String()
}
return nil
Expand Down
Loading
Loading