-
Notifications
You must be signed in to change notification settings - Fork 164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Clustering and App related functions #4407
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,7 @@ type domainContext struct { | |
pubDomainStatus pubsub.Publication | ||
subGlobalConfig pubsub.Subscription | ||
subZFSPoolStatus pubsub.Subscription | ||
subEdgeNodeInfo pubsub.Subscription | ||
pubAssignableAdapters pubsub.Publication | ||
pubDomainMetric pubsub.Publication | ||
pubHostMemory pubsub.Publication | ||
|
@@ -126,6 +127,7 @@ type domainContext struct { | |
cpuPinningSupported bool | ||
// Is it kubevirt eve | ||
hvTypeKube bool | ||
nodeName string | ||
} | ||
|
||
// AddAgentSpecificCLIFlags adds CLI options | ||
|
@@ -414,9 +416,24 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar | |
domainCtx.subZFSPoolStatus = subZFSPoolStatus | ||
subZFSPoolStatus.Activate() | ||
|
||
// Look for edge node info | ||
subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{ | ||
AgentName: "zedagent", | ||
MyAgentName: agentName, | ||
TopicImpl: types.EdgeNodeInfo{}, | ||
Persistent: true, | ||
Activate: false, | ||
}) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
domainCtx.subEdgeNodeInfo = subEdgeNodeInfo | ||
_ = subEdgeNodeInfo.Activate() | ||
|
||
// Parse any existing ConfigIntemValueMap but continue if there | ||
// is none | ||
for !domainCtx.GCComplete { | ||
waitEdgeNodeInfo := true | ||
for !domainCtx.GCComplete || (domainCtx.hvTypeKube && waitEdgeNodeInfo) { | ||
log.Noticef("waiting for GCComplete") | ||
select { | ||
case change := <-subGlobalConfig.MsgChan(): | ||
|
@@ -425,12 +442,22 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar | |
case <-domainCtx.publishTicker.C: | ||
publishProcessesHandler(&domainCtx) | ||
|
||
case change := <-subEdgeNodeInfo.MsgChan(): | ||
subEdgeNodeInfo.ProcessChange(change) | ||
waitEdgeNodeInfo = false | ||
|
||
case <-stillRunning.C: | ||
} | ||
ps.StillRunning(agentName, warningTime, errorTime) | ||
} | ||
log.Noticef("processed GCComplete") | ||
|
||
// Get the EdgeNode info, needed for kubevirt clustering | ||
err = domainCtx.retrieveDeviceNodeName() | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
if !domainCtx.setInitialUsbAccess { | ||
log.Functionf("GCComplete but not setInitialUsbAccess => first boot") | ||
// Enable USB keyboard and storage | ||
|
@@ -513,6 +540,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar | |
case change := <-subZFSPoolStatus.MsgChan(): | ||
subZFSPoolStatus.ProcessChange(change) | ||
|
||
case change := <-subEdgeNodeInfo.MsgChan(): | ||
subEdgeNodeInfo.ProcessChange(change) | ||
|
||
case <-domainCtx.publishTicker.C: | ||
publishProcessesHandler(&domainCtx) | ||
|
||
|
@@ -651,6 +681,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar | |
case change := <-subPhysicalIOAdapter.MsgChan(): | ||
subPhysicalIOAdapter.ProcessChange(change) | ||
|
||
case change := <-subEdgeNodeInfo.MsgChan(): | ||
subEdgeNodeInfo.ProcessChange(change) | ||
|
||
case <-domainCtx.publishTicker.C: | ||
start := time.Now() | ||
err = domainCtx.cipherMetrics.Publish(log, cipherMetricsPub, "global") | ||
|
@@ -977,12 +1010,15 @@ func verifyStatus(ctx *domainContext, status *types.DomainStatus) { | |
status.SetErrorDescription(errDescription) | ||
} | ||
|
||
//cleanup app instance tasks | ||
if err := hyper.Task(status).Delete(status.DomainName); err != nil { | ||
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) | ||
} | ||
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil { | ||
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err) | ||
// in cluster mode, we can not delete the pod due to failing to get app info | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was the issue this is fixing something appearing when there is a failover/takeover and another node in the cluster starts running the app instance? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can happen even on the first node of the app deployment, sometimes we can not get the status from the k3s cluster, or somethings it takes time to come up running state, but we should not remove this kubernetes configuration, it has the config stored in the database, it has it's own scheduling and control process to eventually bring it to the intended state. If we delete the config from the cluster, then we need to wait for another 10 minutes to retry, etc. and it will cause confusing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so, a new boolean is introduced in the domainstatus, DomainConfigDeleted, allow the Designated node, if it knows for sure the app instance is removed from the device, then it can go ahead to delete the app/domain from the cluster. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to capture the above explanation either in a comment here or in pkg/pillar/docs/zedkube.md There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I documented this in zedkube.md, and referenced from domainmgr.go There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm - re-reading it and it still looks odd. If it returns an error or HALTED then this function will set an error, and that error might be propagated to the controller, which would be confusing if the task is slow at starting, or is starting on some other node. So I think this check is in the wrong place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. current code in kubevirt.go logic is that
with above condition, if error is returned, then status is set to types.Broken
this is a good point of the error condition if not running will be confusing. I can change the condition 1) above to 'Scheduling', and it's a currently defined state, and sort of reflecting the kubernetes app status. |
||
if !ctx.hvTypeKube { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And I get the feeling that most or all of these hvTypeKube checks should not be in domainmgr but he inside the kubevirt hypervisor package. If for instance zedmanager is telling domainmgr to shut down or delete a task and if for kubevirt this isn't needed except on some designated node, can't that check whether designated or not be done inside the hypervisor/kubevirt code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can move this into the hypervisor functions, and I think mainly we need to pass in if this Domain config is being deleted or not (DomainStatus.DomainConfigDeleted), so, we need to change the API from '.Delete(domainName string)' into '.Delete(status *types.DomainStatus)' (that is going to change for other hypervisors) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you move it there? If so these this change and the preceeding comment can be removed from verifyStatus(), right? |
||
//cleanup app instance tasks | ||
if err := hyper.Task(status).Delete(status.DomainName); err != nil { | ||
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) | ||
} | ||
if err := hyper.Task(status).Cleanup(status.DomainName); err != nil { | ||
log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err) | ||
} | ||
} | ||
} | ||
status.DomainId = 0 | ||
|
@@ -1071,6 +1107,7 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) { | |
if !status.BootFailed { | ||
return | ||
} | ||
|
||
if status.Activated && status.BootFailed { | ||
log.Functionf("maybeRetryBoot(%s) clearing bootFailed since Activated", | ||
status.Key()) | ||
|
@@ -1138,6 +1175,11 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) { | |
log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err) | ||
} | ||
|
||
// pass nodeName to hypervisor call Setup | ||
if status.NodeName == "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't NodeName be set when DomainStatus is created in handleCreate? |
||
status.NodeName = ctx.nodeName | ||
} | ||
|
||
if err := hyper.Task(status).Setup(*status, *config, ctx.assignableAdapters, nil, file); err != nil { | ||
//it is retry, so omit error | ||
log.Errorf("Failed to create DomainStatus from %+v: %s", | ||
|
@@ -1684,6 +1726,11 @@ func doActivate(ctx *domainContext, config types.DomainConfig, | |
log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err) | ||
} | ||
|
||
// pass nodeName to hypervisor call Setup | ||
if status.NodeName == "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto |
||
status.NodeName = ctx.nodeName | ||
} | ||
|
||
globalConfig := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig) | ||
if err := hyper.Task(status).Setup(*status, config, ctx.assignableAdapters, globalConfig, file); err != nil { | ||
log.Errorf("Failed to create DomainStatus from %+v: %s", | ||
|
@@ -1780,6 +1827,7 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus, | |
status.SetErrorNow(err.Error()) | ||
log.Errorf("doActivateTail(%v) failed for %s: %s", | ||
status.UUIDandVersion, status.DisplayName, err) | ||
|
||
// Delete | ||
if err := hyper.Task(status).Delete(status.DomainName); err != nil { | ||
log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) | ||
|
@@ -1844,7 +1892,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool | |
if doShutdown { | ||
// If the Shutdown fails we don't wait; assume failure | ||
// was due to no PV tools | ||
if err := DomainShutdown(*status, false); err != nil { | ||
if err := DomainShutdown(ctx, *status, false); err != nil { | ||
log.Errorf("DomainShutdown %s failed: %s", | ||
status.DomainName, err) | ||
} else { | ||
|
@@ -1864,7 +1912,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool | |
// the domain is already on the way down. | ||
// In case of errors we proceed directly to deleting the task, | ||
// and after that we waitForDomainGone | ||
if err := DomainShutdown(*status, true); err != nil { | ||
if err := DomainShutdown(ctx, *status, true); err != nil { | ||
log.Warnf("DomainShutdown -F %s failed: %s", | ||
status.DomainName, err) | ||
} else { | ||
|
@@ -2508,13 +2556,14 @@ func DomainCreate(ctx *domainContext, status types.DomainStatus) (int, error) { | |
} | ||
|
||
// DomainShutdown is a wrapper for domain shutdown | ||
func DomainShutdown(status types.DomainStatus, force bool) error { | ||
func DomainShutdown(ctx *domainContext, status types.DomainStatus, force bool) error { | ||
|
||
var err error | ||
log.Functionf("DomainShutdown force-%v %s %d", force, status.DomainName, status.DomainId) | ||
|
||
// Stop the domain | ||
log.Functionf("Stopping domain - %s", status.DomainName) | ||
|
||
err = hyper.Task(&status).Stop(status.DomainName, force) | ||
|
||
return err | ||
|
@@ -3603,3 +3652,15 @@ func lookupCapabilities(ctx *domainContext) (*types.Capabilities, error) { | |
} | ||
return &capabilities, nil | ||
} | ||
|
||
func (ctx *domainContext) retrieveDeviceNodeName() error { | ||
NodeInfo, err := ctx.subEdgeNodeInfo.Get("global") | ||
if err != nil { | ||
log.Errorf("retrieveDeviceNodeName: can't get edgeNodeInfo %v", err) | ||
return err | ||
} | ||
enInfo := NodeInfo.(types.EdgeNodeInfo) | ||
ctx.nodeName = strings.ReplaceAll(strings.ToLower(enInfo.DeviceName), "_", "-") | ||
log.Noticef("retrieveDeviceNodeName: devicename, NodeInfo %v", NodeInfo) // XXX | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the XXX comment? |
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -229,6 +229,9 @@ type zedagentContext struct { | |
// Is Kubevirt eve | ||
hvTypeKube bool | ||
|
||
// EN cluster config | ||
pubEdgeNodeClusterConfig pubsub.Publication | ||
|
||
// Netdump | ||
netDumper *netdump.NetDumper // nil if netdump is disabled | ||
netdumpInterval time.Duration | ||
|
@@ -1103,6 +1106,16 @@ func initPublications(zedagentCtx *zedagentContext) { | |
} | ||
getconfigCtx.pubZedAgentStatus.ClearRestarted() | ||
|
||
zedagentCtx.pubEdgeNodeClusterConfig, err = ps.NewPublication(pubsub.PublicationOptions{ | ||
AgentName: agentName, | ||
Persistent: true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to be persistent? That makes it much more painful to add fields to the EdgeNodeClusterConfig in future EVE releases. |
||
TopicType: types.EdgeNodeClusterConfig{}, | ||
}) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
zedagentCtx.pubEdgeNodeClusterConfig.ClearRestarted() | ||
|
||
getconfigCtx.pubPhysicalIOAdapters, err = ps.NewPublication(pubsub.PublicationOptions{ | ||
AgentName: agentName, | ||
TopicType: types.PhysicalIOAdapterList{}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -154,7 +154,7 @@ func (z *zedkube) checkAppsStatus() { | |
} | ||
|
||
pub := z.pubENClusterAppStatus | ||
stItmes := pub.GetAll() | ||
stItems := pub.GetAll() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update this function from the POC branch, it changed a lot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zedi-pramodh as discussed, since some of the types boolean changes, update the poc code to applog.go would need to update many other files. I'll leave to your later PR to add those changes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok no problem, I will take care of those and submit my PR once this one is merged. |
||
var oldStatus *types.ENClusterAppStatus | ||
for _, item := range items { | ||
aiconfig := item.(types.AppInstanceConfig) | ||
|
@@ -179,7 +179,7 @@ func (z *zedkube) checkAppsStatus() { | |
} | ||
} | ||
|
||
for _, st := range stItmes { | ||
for _, st := range stItems { | ||
aiStatus := st.(types.ENClusterAppStatus) | ||
if aiStatus.AppUUID == aiconfig.UUIDandVersion.UUID { | ||
oldStatus = &aiStatus | ||
|
@@ -204,7 +204,7 @@ func (z *zedkube) getnodeNameAndUUID() error { | |
return err | ||
} | ||
enInfo := NodeInfo.(types.EdgeNodeInfo) | ||
z.nodeName = strings.ToLower(enInfo.DeviceName) | ||
z.nodeName = strings.ReplaceAll(strings.ToLower(enInfo.DeviceName), "_", "-") | ||
z.nodeuuid = enInfo.DeviceID.String() | ||
} | ||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't fatal here since you only wait if hvTypeKube.
How about making Eden/Adam send the EdgeNodeInfo and wait even if not hvTypeKube?
That would remove some special cases like this issue.