From 2417b58d3166126d8d2927af7bb760d4939c7ee3 Mon Sep 17 00:00:00 2001 From: Ethan Mosbaugh Date: Wed, 27 Nov 2024 08:20:52 -0600 Subject: [PATCH] chore(ec): send upgrade events instead of the operator (#5017) * chore(ec): send upgrade events instead of the operator * f * f * f * increase preflight timeout * feedback * explain --- e2e/playwright/tests/@smoke-test/test.spec.ts | 2 +- pkg/embeddedcluster/metrics.go | 107 ++++++++++++++++++ pkg/embeddedcluster/upgrade.go | 9 ++ pkg/embeddedcluster/util.go | 15 +++ pkg/operator/operator.go | 76 +++++++++++-- pkg/upgradeservice/deploy/deploy.go | 31 +++++ 6 files changed, 232 insertions(+), 8 deletions(-) create mode 100644 pkg/embeddedcluster/metrics.go diff --git a/e2e/playwright/tests/@smoke-test/test.spec.ts b/e2e/playwright/tests/@smoke-test/test.spec.ts index 214517f7e0..cc7ea645e0 100644 --- a/e2e/playwright/tests/@smoke-test/test.spec.ts +++ b/e2e/playwright/tests/@smoke-test/test.spec.ts @@ -27,7 +27,7 @@ test("smoke test", async ({ page }) => { ); await page.getByRole("button", { name: "Continue" }).click(); await expect(page.locator("#app")).toContainText("Results", { - timeout: 30000, + timeout: 60 * 1000, }); await expect(page.locator("#app")).toContainText("Sequence is 0"); await page.getByRole("button", { name: "Deploy" }).click(); diff --git a/pkg/embeddedcluster/metrics.go b/pkg/embeddedcluster/metrics.go new file mode 100644 index 0000000000..4523b0df84 --- /dev/null +++ b/pkg/embeddedcluster/metrics.go @@ -0,0 +1,107 @@ +package embeddedcluster + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + embeddedclusterv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1" + "github.com/replicatedhq/kots/pkg/logger" +) + +// UpgradeStartedEvent is send back home when the upgrade starts. +type UpgradeStartedEvent struct { + ClusterID string `json:"clusterID"` + TargetVersion string `json:"targetVersion"` + InitialVersion string `json:"initialVersion"` + AppVersion string `json:"appVersion"` +} + +// UpgradeFailedEvent is send back home when the upgrade fails. +type UpgradeFailedEvent struct { + ClusterID string `json:"clusterID"` + TargetVersion string `json:"targetVersion"` + InitialVersion string `json:"initialVersion"` + Reason string `json:"reason"` +} + +// UpgradeSucceededEvent event is send back home when the upgrade succeeds. +type UpgradeSucceededEvent struct { + ClusterID string `json:"clusterID"` + TargetVersion string `json:"targetVersion"` + InitialVersion string `json:"initialVersion"` +} + +// NotifyUpgradeStarted notifies the metrics server that an upgrade has started. +func NotifyUpgradeStarted(ctx context.Context, baseURL string, ins, prev *embeddedclusterv1beta1.Installation, versionLabel string) error { + if ins.Spec.AirGap { + return nil + } + return sendEvent(ctx, "UpgradeStarted", baseURL, UpgradeStartedEvent{ + ClusterID: ins.Spec.ClusterID, + TargetVersion: ins.Spec.Config.Version, + InitialVersion: prev.Spec.Config.Version, + AppVersion: versionLabel, + }) +} + +// NotifyUpgradeFailed notifies the metrics server that an upgrade has failed. +func NotifyUpgradeFailed(ctx context.Context, baseURL string, ins, prev *embeddedclusterv1beta1.Installation, reason string) error { + if ins.Spec.AirGap { + return nil + } + return sendEvent(ctx, "UpgradeFailed", baseURL, UpgradeFailedEvent{ + ClusterID: ins.Spec.ClusterID, + TargetVersion: ins.Spec.Config.Version, + InitialVersion: prev.Spec.Config.Version, + Reason: reason, + }) +} + +// NotifyUpgradeSucceeded notifies the metrics server that an upgrade has succeeded. +func NotifyUpgradeSucceeded(ctx context.Context, baseURL string, ins, prev *embeddedclusterv1beta1.Installation) error { + if ins.Spec.AirGap { + return nil + } + return sendEvent(ctx, "UpgradeSucceeded", baseURL, UpgradeSucceededEvent{ + ClusterID: ins.Spec.ClusterID, + TargetVersion: ins.Spec.Config.Version, + InitialVersion: prev.Spec.Config.Version, + }) +} + +// sendEvent sends the received event to the metrics server through a post request. +func sendEvent(ctx context.Context, evname, baseURL string, ev interface{}) error { + url := fmt.Sprintf("%s/embedded_cluster_metrics/%s", baseURL, evname) + + logger.Infof("Sending event %s to %s", evname, url) + + body := map[string]interface{}{"event": ev} + buf := bytes.NewBuffer(nil) + if err := json.NewEncoder(buf).Encode(body); err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, buf) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{ + Timeout: 5 * time.Second, + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send event: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to send event: %s", resp.Status) + } + return nil +} diff --git a/pkg/embeddedcluster/upgrade.go b/pkg/embeddedcluster/upgrade.go index 918f0b2372..2868d0cc33 100644 --- a/pkg/embeddedcluster/upgrade.go +++ b/pkg/embeddedcluster/upgrade.go @@ -22,6 +22,7 @@ import ( dockerregistrytypes "github.com/replicatedhq/kots/pkg/docker/registry/types" "github.com/replicatedhq/kots/pkg/imageutil" "github.com/replicatedhq/kots/pkg/k8sutil" + "github.com/replicatedhq/kots/pkg/logger" registrytypes "github.com/replicatedhq/kots/pkg/registry/types" "github.com/replicatedhq/kots/pkg/util" kotsv1beta1 "github.com/replicatedhq/kotskinds/apis/kots/v1beta1" @@ -75,8 +76,16 @@ func startClusterUpgrade( log.Printf("Starting cluster upgrade to version %s...", newcfg.Version) + // We cannot notify the upgrade started until the new install is available + if err := NotifyUpgradeStarted(ctx, license.Spec.Endpoint, newInstall, current, versionLabel); err != nil { + logger.Errorf("Failed to notify upgrade started: %v", err) + } + err = runClusterUpgrade(ctx, k8sClient, newInstall, registrySettings, license, versionLabel) if err != nil { + if err := NotifyUpgradeFailed(ctx, license.Spec.Endpoint, newInstall, current, err.Error()); err != nil { + logger.Errorf("Failed to notify upgrade failed: %v", err) + } return fmt.Errorf("run cluster upgrade: %w", err) } diff --git a/pkg/embeddedcluster/util.go b/pkg/embeddedcluster/util.go index 5a7945ea99..aaf0095521 100644 --- a/pkg/embeddedcluster/util.go +++ b/pkg/embeddedcluster/util.go @@ -68,6 +68,21 @@ func GetCurrentInstallation(ctx context.Context, kbClient kbclient.Client) (*emb return &installations[0], nil } +// GetCurrentInstallation returns the second most recent installation object from the cluster. +func GetPreviousInstallation(ctx context.Context, kbClient kbclient.Client) (*embeddedclusterv1beta1.Installation, error) { + installations, err := ListInstallations(ctx, kbClient) + if err != nil { + return nil, fmt.Errorf("failed to list installations: %w", err) + } + if len(installations) < 2 { + return nil, nil + } + sort.SliceStable(installations, func(i, j int) bool { + return installations[j].Name < installations[i].Name + }) + return &installations[1], nil +} + func ListInstallations(ctx context.Context, kbClient kbclient.Client) ([]embeddedclusterv1beta1.Installation, error) { var installationList embeddedclusterv1beta1.InstallationList if err := kbClient.List(ctx, &installationList, &kbclient.ListOptions{}); err != nil { diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index a4348efe17..2e750b5ef8 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -12,6 +12,7 @@ import ( "time" "github.com/pkg/errors" + embeddedclusterv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1" downstreamtypes "github.com/replicatedhq/kots/pkg/api/downstream/types" "github.com/replicatedhq/kots/pkg/app" apptypes "github.com/replicatedhq/kots/pkg/app/types" @@ -53,6 +54,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" ) var ( @@ -933,7 +935,7 @@ func (o *Operator) reconcileDeployment(cm *corev1.ConfigMap) (finalError error) if cm.Data["requires-cluster-upgrade"] == "true" { // wait for cluster upgrade even if the embedded cluster version doesn't match yet // in order to continuously report progress to the user - if err := o.waitForClusterUpgrade(cm.Data["app-slug"]); err != nil { + if err := o.waitForClusterUpgrade(cm.Data["app-id"], cm.Data["app-slug"]); err != nil { return errors.Wrap(err, "failed to wait for cluster upgrade") } } @@ -1035,21 +1037,31 @@ func (o *Operator) reconcileDeployment(cm *corev1.ConfigMap) (finalError error) return nil } -func (o *Operator) waitForClusterUpgrade(appSlug string) error { - kbClient, err := k8sutil.GetKubeClient(context.Background()) +func (o *Operator) waitForClusterUpgrade(appID string, appSlug string) error { + ctx := context.Background() + + kbClient, err := k8sutil.GetKubeClient(ctx) if err != nil { return errors.Wrap(err, "failed to get kube client") } - logger.Infof("waiting for cluster upgrade to finish") + logger.Infof("Waiting for cluster upgrade to finish") for { - ins, err := embeddedcluster.GetCurrentInstallation(context.Background(), kbClient) + ins, err := embeddedcluster.GetCurrentInstallation(ctx, kbClient) if err != nil { return errors.Wrap(err, "failed to wait for embedded cluster installation") } - if embeddedcluster.InstallationSucceeded(context.Background(), ins) { + if embeddedcluster.InstallationSucceeded(ctx, ins) { + logger.Infof("Cluster upgrade succeeded") + if err := o.notifyClusterUpgradeSucceeded(ctx, kbClient, ins, appID); err != nil { + logger.Errorf("Failed to notify upgrade succeeded: %v", err) + } return nil } - if embeddedcluster.InstallationFailed(context.Background(), ins) { + if embeddedcluster.InstallationFailed(ctx, ins) { + logger.Infof("Cluster upgrade failed") + if err := o.notifyClusterUpgradeFailed(ctx, kbClient, ins, appID); err != nil { + logger.Errorf("Failed to notify upgrade failed: %v", err) + } if err := upgradeservicetask.SetStatusUpgradeFailed(appSlug, ins.Status.Reason); err != nil { return errors.Wrap(err, "failed to set task status to failed") } @@ -1061,3 +1073,53 @@ func (o *Operator) waitForClusterUpgrade(appSlug string) error { time.Sleep(5 * time.Second) } } + +// notifyClusterUpgradeSucceeded sends a metrics event to the api that the upgrade succeeded. +func (o *Operator) notifyClusterUpgradeSucceeded(ctx context.Context, kbClient kbclient.Client, ins *embeddedclusterv1beta1.Installation, appID string) error { + if ins.Spec.AirGap { + return nil + } + + license, err := o.store.GetLatestLicenseForApp(appID) + if err != nil { + return errors.Wrapf(err, "failed to get latest license for app %s", appID) + } + + prev, err := embeddedcluster.GetPreviousInstallation(ctx, kbClient) + if err != nil { + return errors.Wrap(err, "failed to get previous installation") + } else if prev == nil { + return errors.New("previous installation not found") + } + + err = embeddedcluster.NotifyUpgradeSucceeded(ctx, license.Spec.Endpoint, ins, prev) + if err != nil { + return errors.Wrap(err, "failed to send event") + } + return nil +} + +// notifyClusterUpgradeFailed sends a metrics event to the api that the upgrade failed. +func (o *Operator) notifyClusterUpgradeFailed(ctx context.Context, kbClient kbclient.Client, ins *embeddedclusterv1beta1.Installation, appID string) error { + if ins.Spec.AirGap { + return nil + } + + license, err := o.store.GetLatestLicenseForApp(appID) + if err != nil { + return errors.Wrapf(err, "failed to get latest license for app %s", appID) + } + + prev, err := embeddedcluster.GetPreviousInstallation(ctx, kbClient) + if err != nil { + return errors.Wrap(err, "failed to get previous installation") + } else if prev == nil { + return errors.New("previous installation not found") + } + + err = embeddedcluster.NotifyUpgradeFailed(ctx, license.Spec.Endpoint, ins, prev, ins.Status.Reason) + if err != nil { + return errors.Wrap(err, "failed to send event") + } + return nil +} diff --git a/pkg/upgradeservice/deploy/deploy.go b/pkg/upgradeservice/deploy/deploy.go index 1ffae96153..0c00795735 100644 --- a/pkg/upgradeservice/deploy/deploy.go +++ b/pkg/upgradeservice/deploy/deploy.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" kuberneteserrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" ) type CanDeployOptions struct { @@ -141,6 +142,11 @@ func Deploy(opts DeployOptions) error { tgzArchiveKey: tgzArchiveKey, requiresClusterUpgrade: true, }); err != nil { + // The operator is responsible for notifying of upgrade success/failure using the deployment. + // If we cannot create the deployment, the operator cannot take over and we need to notify of failure here. + if err := notifyClusterUpgradeFailed(context.Background(), kbClient, opts, finalError.Error()); err != nil { + logger.Errorf("Failed to notify upgrade failed: %v", err) + } return errors.Wrap(err, "failed to create deployment") } @@ -150,6 +156,31 @@ func Deploy(opts DeployOptions) error { return nil } +// notifyClusterUpgradeFailed sends a metrics event to the api that the upgrade failed. +func notifyClusterUpgradeFailed(ctx context.Context, kbClient kbclient.Client, opts DeployOptions, reason string) error { + ins, err := embeddedcluster.GetCurrentInstallation(ctx, kbClient) + if err != nil { + return fmt.Errorf("failed to get current installation: %w", err) + } + + if ins.Spec.AirGap { + return nil + } + + prev, err := embeddedcluster.GetPreviousInstallation(ctx, kbClient) + if err != nil { + return errors.Wrap(err, "failed to get previous installation") + } else if prev == nil { + return errors.New("previous installation not found") + } + + err = embeddedcluster.NotifyUpgradeFailed(ctx, opts.KotsKinds.License.Spec.Endpoint, ins, prev, reason) + if err != nil { + return errors.Wrap(err, "failed to send event") + } + return nil +} + type createDeploymentOptions struct { ctx context.Context isSkipPreflights bool