Skip to content

Commit

Permalink
add / upgrade / remove extensions via ws
Browse files Browse the repository at this point in the history
  • Loading branch information
sgalsaleh committed Dec 19, 2024
1 parent aacda4e commit a37dfb5
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 30 deletions.
File renamed without changes.
104 changes: 104 additions & 0 deletions pkg/plan/extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package plan

import (
"context"
"reflect"

"github.com/pkg/errors"
ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1"
"github.com/replicatedhq/kots/pkg/embeddedcluster"
"github.com/replicatedhq/kots/pkg/plan/types"
"github.com/replicatedhq/kots/pkg/websocket"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type ExtensionsDiffResult struct {
Added []ecv1beta1.Chart
Removed []ecv1beta1.Chart
Modified []ecv1beta1.Chart
}

func executeECExtensionAdd(p *types.Plan, step *types.PlanStep) error {
in, ok := step.Input.(types.PlanStepInputECExtension)
if !ok {
return errors.New("invalid input for embedded cluster extension add step")
}
if err := websocket.AddExtension(in.Repos, in.Chart, p.AppSlug, p.VersionLabel, step.ID); err != nil {
return errors.Wrap(err, "add extension")
}
return nil
}

func executeECExtensionUpgrade(p *types.Plan, step *types.PlanStep) error {
in, ok := step.Input.(types.PlanStepInputECExtension)
if !ok {
return errors.New("invalid input for embedded cluster extension upgrade step")
}
if err := websocket.UpgradeExtension(in.Repos, in.Chart, p.AppSlug, p.VersionLabel, step.ID); err != nil {
return errors.Wrap(err, "upgrade extension")
}
return nil
}

func executeECExtensionRemove(p *types.Plan, step *types.PlanStep) error {
in, ok := step.Input.(types.PlanStepInputECExtension)
if !ok {
return errors.New("invalid input for embedded cluster extension remove step")
}
if err := websocket.RemoveExtension(in.Repos, in.Chart, p.AppSlug, p.VersionLabel, step.ID); err != nil {
return errors.Wrap(err, "remove extension")
}
return nil
}

func getExtensions(kcli kbclient.Client, newSpec *ecv1beta1.ConfigSpec) (ecv1beta1.Extensions, ecv1beta1.Extensions, error) {
currInstall, err := embeddedcluster.GetCurrentInstallation(context.Background(), kcli)
if err != nil {
return ecv1beta1.Extensions{}, ecv1beta1.Extensions{}, errors.Wrap(err, "get current embedded cluster installation")
}
return currInstall.Spec.Config.Extensions, newSpec.Extensions, nil
}

func diffExtensions(oldExts, newExts ecv1beta1.Extensions) ExtensionsDiffResult {
oldCharts := make(map[string]ecv1beta1.Chart)
newCharts := make(map[string]ecv1beta1.Chart)

if oldExts.Helm != nil {
for _, chart := range oldExts.Helm.Charts {
oldCharts[chart.Name] = chart
}
}
if newExts.Helm != nil {
for _, chart := range newExts.Helm.Charts {
newCharts[chart.Name] = chart
}
}

var added, removed, modified []ecv1beta1.Chart

// find removed and modified charts.
for name, oldChart := range oldCharts {
newChart, exists := newCharts[name]
if !exists {
// chart was removed.
removed = append(removed, oldChart)
} else if !reflect.DeepEqual(oldChart, newChart) {
// chart was modified.
modified = append(modified, newChart)
}
}

// find added charts.
for name, newChart := range newCharts {
if _, exists := oldCharts[name]; !exists {
// chart was added.
added = append(added, newChart)
}
}

return ExtensionsDiffResult{
Added: added,
Removed: removed,
Modified: modified,
}
}
106 changes: 97 additions & 9 deletions pkg/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,74 @@ func PlanUpgrade(s store.Store, kcli kbclient.Client, opts PlanUpgradeOptions) (
})
}

// TODO (@salah) implement our EC addons upgrade (have to use EC release metadata?). use same diff logic below

currExts, newExts, err := getExtensions(kcli, newECConfigSpec)
if err != nil {
return nil, errors.Wrap(err, "get extensions")
}

extsDiff := diffExtensions(currExts, newExts)
newRepos := newExts.Helm.Repositories

// added extensions
for _, chart := range extsDiff.Added {
p.Steps = append(p.Steps, &types.PlanStep{
ID: ksuid.New().String(),
Name: "Extension Add",
Type: types.StepTypeECExtensionAdd,
Status: types.StepStatusPending,
StatusDescription: "Pending extension addition",
Input: types.PlanStepInputECExtension{
Repos: newRepos,
Chart: chart,
},
Owner: types.StepOwnerECManager,
})
}

// modified extensions
for _, chart := range extsDiff.Modified {
p.Steps = append(p.Steps, &types.PlanStep{
ID: ksuid.New().String(),
Name: "Extension Upgrade",
Type: types.StepTypeECExtensionUpgrade,
Status: types.StepStatusPending,
StatusDescription: "Pending extension upgrade",
Input: types.PlanStepInputECExtension{
Repos: newRepos,
Chart: chart,
},
Owner: types.StepOwnerECManager,
})
}

// removed extensions
for _, chart := range extsDiff.Removed {
p.Steps = append(p.Steps, &types.PlanStep{
ID: ksuid.New().String(),
Name: "Extension Remove",
Type: types.StepTypeECExtensionRemove,
Status: types.StepStatusPending,
StatusDescription: "Pending extension removal",
Input: types.PlanStepInputECExtension{
Repos: newRepos,
Chart: chart,
},
Owner: types.StepOwnerECManager,
})
}

// app upgrade
p.Steps = append(p.Steps, &types.PlanStep{
ID: ksuid.New().String(),
Name: "Application Upgrade",
Type: types.StepTypeAppUpgrade,
Status: types.StepStatusPending,
StatusDescription: "Pending application upgrade",
Owner: types.StepOwnerKOTS,
// the input here is the app upgrade service output
})
// p.Steps = append(p.Steps, &types.PlanStep{
// ID: ksuid.New().String(),
// Name: "Application Upgrade",
// Type: types.StepTypeAppUpgrade,
// Status: types.StepStatusPending,
// StatusDescription: "Pending application upgrade",
// Owner: types.StepOwnerKOTS,
// // the input here is the app upgrade service output
// })

return &p, nil
}
Expand Down Expand Up @@ -202,6 +260,36 @@ func executeStep(s store.Store, p *types.Plan, step *types.PlanStep) (finalError
return errors.Wrap(err, "wait for embedded cluster upgrade")
}

case types.StepTypeECExtensionAdd:
if step.Status == types.StepStatusPending {
if err := executeECExtensionAdd(p, step); err != nil {
return errors.Wrap(err, "execute embedded cluster extension add")
}
}
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for embedded cluster extension add")
}

case types.StepTypeECExtensionUpgrade:
if step.Status == types.StepStatusPending {
if err := executeECExtensionUpgrade(p, step); err != nil {
return errors.Wrap(err, "execute embedded cluster extension upgrade")
}
}
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for embedded cluster extension upgrade")
}

case types.StepTypeECExtensionRemove:
if step.Status == types.StepStatusPending {
if err := executeECExtensionRemove(p, step); err != nil {
return errors.Wrap(err, "execute embedded cluster extension remove")
}
}
if err := waitForStep(s, p, step.ID); err != nil {
return errors.Wrap(err, "wait for embedded cluster extension remove")
}

case types.StepTypeAppUpgrade:
if err := executeAppUpgrade(s, p, step); err != nil {
return errors.Wrap(err, "execute app upgrade")
Expand Down
15 changes: 12 additions & 3 deletions pkg/plan/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1"
upgradeservicetypes "github.com/replicatedhq/kots/pkg/upgradeservice/types"
kotsv1beta1 "github.com/replicatedhq/kotskinds/apis/kots/v1beta1"
Expand Down Expand Up @@ -37,9 +38,12 @@ type PlanStep struct {
type PlanStepType string

const (
StepTypeAppUpgradeService PlanStepType = "app-upgrade-service"
StepTypeAppUpgrade PlanStepType = "app-upgrade"
StepTypeECUpgrade PlanStepType = "ec-upgrade"
StepTypeAppUpgradeService PlanStepType = "app-upgrade-service"
StepTypeAppUpgrade PlanStepType = "app-upgrade"
StepTypeECUpgrade PlanStepType = "ec-upgrade"
StepTypeECExtensionAdd PlanStepType = "ec-extension-add"
StepTypeECExtensionUpgrade PlanStepType = "ec-extension-upgrade"
StepTypeECExtensionRemove PlanStepType = "ec-extension-remove"
)

type PlanStepStatus string
Expand Down Expand Up @@ -70,6 +74,11 @@ type PlanStepInputECUpgrade struct {
IsDisasterRecoverySupported bool `json:"isDisasterRecoverySupported" yaml:"isDisasterRecoverySupported"`
}

type PlanStepInputECExtension struct {
Repos []k0sv1beta1.Repository `json:"repos" yaml:"repos"`
Chart ecv1beta1.Chart `json:"chart" yaml:"chart"`
}

func (p *Plan) HasEnded() bool {
status := p.GetStatus()
return status == StepStatusFailed || status == StepStatusComplete
Expand Down
102 changes: 84 additions & 18 deletions pkg/websocket/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"

"github.com/gorilla/websocket"
k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/pkg/errors"
ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1"
"github.com/replicatedhq/kots/pkg/logger"
Expand All @@ -12,23 +13,6 @@ import (

// UpgradeCluster sends an upgrade command to the first available websocket from the active ones
func UpgradeCluster(installation *ecv1beta1.Installation, appSlug, versionLabel, stepID string) error {
wsMutex.Lock()
defer wsMutex.Unlock()

if len(wsClients) == 0 {
return errors.New("no active websocket connections available")
}

var selectedClient types.WSClient
var nodeName string

// get the first client in the map
for name, client := range wsClients {
nodeName = name
selectedClient = client
break
}

marshalledInst, err := json.Marshal(installation)
if err != nil {
return errors.Wrap(err, "marshal installation")
Expand All @@ -52,11 +36,93 @@ func UpgradeCluster(installation *ecv1beta1.Installation, appSlug, versionLabel,
return errors.Wrap(err, "marshal command message")
}

wscli, nodeName, err := firstActiveWSClient()
if err != nil {
return errors.Wrap(err, "get first active websocket client")
}

logger.Infof("Sending cluster upgrade command to websocket of node %s with message: %s", nodeName, string(message))

if err := selectedClient.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
if err := wscli.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
return errors.Wrap(err, "send upgrade command to websocket")
}

return nil
}

func AddExtension(repos []k0sv1beta1.Repository, chart ecv1beta1.Chart, appSlug, versionLabel, stepID string) error {
return sendExtensionCommand("add-extension", repos, chart, appSlug, versionLabel, stepID)
}

func UpgradeExtension(repos []k0sv1beta1.Repository, chart ecv1beta1.Chart, appSlug, versionLabel, stepID string) error {
return sendExtensionCommand("upgrade-extension", repos, chart, appSlug, versionLabel, stepID)
}

func RemoveExtension(repos []k0sv1beta1.Repository, chart ecv1beta1.Chart, appSlug, versionLabel, stepID string) error {
return sendExtensionCommand("remove-extension", repos, chart, appSlug, versionLabel, stepID)
}

func sendExtensionCommand(command string, repos []k0sv1beta1.Repository, chart ecv1beta1.Chart, appSlug, versionLabel, stepID string) error {
marshalledRepos, err := json.Marshal(repos)
if err != nil {
return errors.Wrap(err, "marshal repos")
}

marshalledChart, err := json.Marshal(chart)
if err != nil {
return errors.Wrap(err, "marshal chart")
}

data, err := json.Marshal(map[string]string{
"repos": string(marshalledRepos),
"chart": string(marshalledChart),
"appSlug": appSlug,
"versionLabel": versionLabel,
"stepID": stepID,
})
if err != nil {
return errors.Wrap(err, "marshal data")
}

message, err := json.Marshal(map[string]interface{}{
"command": command,
"data": string(data),
})
if err != nil {
return errors.Wrap(err, "marshal command message")
}

wscli, nodeName, err := firstActiveWSClient()
if err != nil {
return errors.Wrap(err, "get first active websocket client")
}

logger.Infof("Sending extension %s command to websocket of node %s with message: %s", command, nodeName, string(message))

if err := wscli.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
return errors.Wrap(err, "send upgrade command to websocket")
}

return nil
}

func firstActiveWSClient() (types.WSClient, string, error) {
wsMutex.Lock()
defer wsMutex.Unlock()

if len(wsClients) == 0 {
return types.WSClient{}, "", errors.New("no active websocket connections available")
}

var wscli types.WSClient
var nodeName string

// get the first client in the map
for name, client := range wsClients {
nodeName = name
wscli = client
break
}

return wscli, nodeName, nil
}

0 comments on commit a37dfb5

Please sign in to comment.