Skip to content

Commit

Permalink
eventually exit cluster state update loop, only persist to DB on stat…
Browse files Browse the repository at this point in the history
…e change
  • Loading branch information
laverya committed Dec 7, 2023
1 parent 37c2b22 commit 31f2291
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
31 changes: 22 additions & 9 deletions pkg/embeddedcluster/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,50 @@ func MaybeStartClusterUpgrade(ctx context.Context, store store.Store, conf *v1be
}

// watchClusterState checks the status of the installation object and updates the cluster state
// after the cluster state has been 'ready' for 5 minutes, it will exit the loop.
// after the cluster state has been 'installed' for 30 seconds, it will exit the loop.
// this function is blocking and should be run in a goroutine.
// if it is called multiple times, only one instance will run.
// TODO: implement exit condition
func watchClusterState(ctx context.Context, store store.Store) {
stateMut.Lock()
for {
numReady := 0
lastState := ""
for numReady < 6 {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 5):
}
if err := updateClusterState(ctx, store); err != nil {
state, err := updateClusterState(ctx, store, lastState)
if err != nil {
logger.Errorf("embeddedcluster monitor: fail updating state: %v", err)
}

if state == v1beta1.InstallationStateInstalled {
numReady++
} else {
numReady = 0
}
lastState = state
}
}

// updateClusterState updates the cluster state in the database. Gets the state from the cluster
// by reading the latest embedded cluster installation CRD.
func updateClusterState(ctx context.Context, store store.Store) error {
// If the lastState is the same as the current state, it will not update the database.
func updateClusterState(ctx context.Context, store store.Store, lastState string) (string, error) {
installation, err := GetCurrentInstallation(ctx)
if err != nil {
return fmt.Errorf("failed to get current installation: %w", err)
return "", fmt.Errorf("failed to get current installation: %w", err)
}
state := v1beta1.InstallationStateUnknown
if installation.Status.State != "" {
state = installation.Status.State
}
if err := store.SetEmbeddedClusterState(state); err != nil {
return fmt.Errorf("failed to update embedded cluster state: %w", err)
// only update the state if it has changed
if state != lastState {
if err := store.SetEmbeddedClusterState(state); err != nil {
return "", fmt.Errorf("failed to update embedded cluster state: %w", err)
}
}
return nil
return state, nil
}
13 changes: 6 additions & 7 deletions pkg/store/kotsstore/embedded_cluster_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"github.com/pkg/errors"
"github.com/rqlite/gorqlite"

"github.com/replicatedhq/kots/pkg/persistence"
Expand All @@ -23,7 +22,7 @@ func (s *KOTSStore) SetEmbeddedClusterInstallCommandRoles(roles []string) (strin
Arguments: []interface{}{installID},
})
if err != nil {
return "", fmt.Errorf("delete embedded_cluster join token: %v: %v", err, wr.Err)
return "", fmt.Errorf("delete embedded_cluster join token: %w: %v", err, wr.Err)
}

jsonRoles, err := json.Marshal(roles)
Expand All @@ -37,7 +36,7 @@ func (s *KOTSStore) SetEmbeddedClusterInstallCommandRoles(roles []string) (strin
Arguments: []interface{}{installID, string(jsonRoles)},
})
if err != nil {
return "", fmt.Errorf("insert embedded_cluster join token: %v: %v", err, wr.Err)
return "", fmt.Errorf("insert embedded_cluster join token: %w: %v", err, wr.Err)
}

return installID, nil
Expand All @@ -51,7 +50,7 @@ func (s *KOTSStore) GetEmbeddedClusterInstallCommandRoles(token string) ([]strin
Arguments: []interface{}{token},
})
if err != nil {
return nil, fmt.Errorf("failed to query: %v: %v", err, rows.Err)
return nil, fmt.Errorf("failed to query: %w: %v", err, rows.Err)
}
if !rows.Next() {
return nil, ErrNotFound
Expand Down Expand Up @@ -83,7 +82,7 @@ on conflict (updated_at) do update set
Arguments: []interface{}{time.Now().Unix(), state},
})
if err != nil {
return fmt.Errorf("failed to write: %v: %v", err, wr.Err)
return fmt.Errorf("failed to write: %w: %v", err, wr.Err)
}
return nil
}
Expand All @@ -96,14 +95,14 @@ func (s *KOTSStore) GetEmbeddedClusterState() (string, error) {
Arguments: []interface{}{},
})
if err != nil {
return "", fmt.Errorf("failed to query: %v: %v", err, rows.Err)
return "", fmt.Errorf("failed to query: %w: %v", err, rows.Err)
}
if !rows.Next() {
return "", nil
}
var state gorqlite.NullString
if err := rows.Scan(&state); err != nil {
return "", errors.Wrap(err, "failed to scan")
return "", fmt.Errorf("failed to scan: %w", err)
}
return state.String, nil
}

0 comments on commit 31f2291

Please sign in to comment.