Skip to content

Commit

Permalink
add cluster mover (#7790)
Browse files Browse the repository at this point in the history
  • Loading branch information
tatlat authored Mar 22, 2024
1 parent f757a59 commit 5f7b94d
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 0 deletions.
137 changes: 137 additions & 0 deletions pkg/clustermanager/eksa_mover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package clustermanager

import (
"context"
"math"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/aws/eks-anywhere/pkg/api/v1alpha1"
"github.com/aws/eks-anywhere/pkg/clients/kubernetes"
"github.com/aws/eks-anywhere/pkg/cluster"
"github.com/aws/eks-anywhere/pkg/retrier"
)

// MoverOpt allows to customize a Mover on construction.
type MoverOpt func(*Mover)

// Mover applies the cluster spec to the management cluster and waits
// until the changes are fully reconciled.
type Mover struct {
log logr.Logger
clientFactory ClientFactory
moveClusterTimeout time.Duration
retryBackOff time.Duration
}

// NewMover builds an Mover.
func NewMover(log logr.Logger, clientFactory ClientFactory, opts ...MoverOpt) *Mover {
m := &Mover{
log: log,
clientFactory: clientFactory,
moveClusterTimeout: applyClusterSpecTimeout,
retryBackOff: retryBackOff,
}

for _, opt := range opts {
opt(m)
}

return m
}

// WithMoverNoTimeouts disables the timeout for all the waits and retries in management upgrader.
func WithMoverNoTimeouts() MoverOpt {
return func(a *Mover) {
maxTime := time.Duration(math.MaxInt64)
a.moveClusterTimeout = maxTime
}
}

// WithMoverApplyClusterTimeout allows to configure how long the mover retries
// to apply the objects in case of failure.
// Generally only used in tests.
func WithMoverApplyClusterTimeout(timeout time.Duration) MoverOpt {
return func(m *Mover) {
m.moveClusterTimeout = timeout
}
}

// WithMoverRetryBackOff allows to configure how long the mover waits between requests
// to update the cluster spec objects and check the status of the Cluster.
// Generally only used in tests.
func WithMoverRetryBackOff(backOff time.Duration) MoverOpt {
return func(m *Mover) {
m.retryBackOff = backOff
}
}

// Move applies the cluster's namespace and spec without checking for reconcile conditions.
func (m *Mover) Move(ctx context.Context, spec *cluster.Spec, fromClient, toClient kubernetes.Client) error {
m.log.V(3).Info("Moving the cluster object")
err := retrier.New(
m.moveClusterTimeout,
retrier.WithRetryPolicy(retrier.BackOffPolicy(m.retryBackOff)),
).Retry(func() error {
// read the cluster from bootstrap
cluster := &v1alpha1.Cluster{}
if err := fromClient.Get(ctx, spec.Cluster.Name, spec.Cluster.Namespace, cluster); err != nil {
return errors.Wrapf(err, "reading cluster from source")
}

// pause cluster on bootstrap
cluster.PauseReconcile()
if err := fromClient.Update(ctx, cluster); err != nil {
return errors.Wrapf(err, "updating cluster on source")
}

if err := moveClusterResource(ctx, cluster, toClient); err != nil {
return err
}

if err := moveChildObjects(ctx, spec, fromClient, toClient); err != nil {
return err
}

return nil
})

return err
}

func moveClusterResource(ctx context.Context, cluster *v1alpha1.Cluster, client kubernetes.Client) error {
cluster.ResourceVersion = ""
cluster.UID = ""

// move eksa cluster
if err := client.Create(ctx, cluster); err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "moving cluster %s", cluster.Name)
}

return nil
}

func moveChildObjects(ctx context.Context, spec *cluster.Spec, fromClient, toClient kubernetes.Client) error {
// read and move child objects
for _, child := range spec.ChildObjects() {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(child.GetObjectKind().GroupVersionKind())
if err := fromClient.Get(ctx, child.GetName(), child.GetNamespace(), obj); err != nil {
return errors.Wrapf(err, "reading child object %s %s", child.GetObjectKind().GroupVersionKind().Kind, child.GetName())
}

obj.SetResourceVersion("")
obj.SetUID("")
obj.SetOwnerReferences(nil)

if err := toClient.Create(ctx, obj); err != nil && !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "moving child object %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
}
}

return nil
}
126 changes: 126 additions & 0 deletions pkg/clustermanager/eksa_mover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package clustermanager_test

import (
"context"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
"github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"

"github.com/aws/eks-anywhere/internal/test"
"github.com/aws/eks-anywhere/pkg/clients/kubernetes"
"github.com/aws/eks-anywhere/pkg/cluster"
"github.com/aws/eks-anywhere/pkg/clustermanager"
"github.com/aws/eks-anywhere/pkg/clustermanager/mocks"
"github.com/aws/eks-anywhere/pkg/controller/clientutil"
"github.com/aws/eks-anywhere/pkg/types"
)

type moverTest struct {
gomega.Gomega
tb testing.TB
clientFactory *mocks.MockClientFactory
ctx context.Context
spec *cluster.Spec
fromClient kubernetes.Client
toClient kubernetes.Client
log logr.Logger
mgmtCluster *types.Cluster
bootstrap *types.Cluster
}

func newMoverTest(tb testing.TB) *moverTest {
ctrl := gomock.NewController(tb)
return &moverTest{
tb: tb,
Gomega: gomega.NewWithT(tb),
clientFactory: mocks.NewMockClientFactory(ctrl),
ctx: context.Background(),
spec: test.VSphereClusterSpec(tb, tb.Name()),
log: test.NewNullLogger(),
bootstrap: &types.Cluster{
KubeconfigFile: "bootstrap-config",
},
mgmtCluster: &types.Cluster{
KubeconfigFile: "my-config",
},
}
}

func (a *moverTest) buildClients(fromObjs, toObjs []kubernetes.Object) {
a.fromClient = test.NewFakeKubeClient(clientutil.ObjectsToClientObjects(fromObjs)...)
a.toClient = test.NewFakeKubeClient(clientutil.ObjectsToClientObjects(toObjs)...)
}

func TestMoverSuccess(t *testing.T) {
tt := newMoverTest(t)
objs := tt.spec.ClusterAndChildren()
tt.buildClients(objs, nil)
m := clustermanager.NewMover(tt.log, tt.clientFactory,
clustermanager.WithMoverRetryBackOff(time.Millisecond),
clustermanager.WithMoverNoTimeouts(),
)

tt.Expect(m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient)).To(gomega.Succeed())

for _, obj := range objs {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
tt.Expect(tt.toClient.Get(tt.ctx, obj.GetName(), obj.GetNamespace(), u)).To(gomega.Succeed())
original, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
tt.Expect(err).To(gomega.Succeed())
tt.Expect(u.Object["spec"]).To(gomega.BeComparableTo(original["spec"]))
}
}

func TestMoverFailReadCluster(t *testing.T) {
tt := newMoverTest(t)
tt.buildClients(nil, nil)
m := clustermanager.NewMover(tt.log, tt.clientFactory,
clustermanager.WithMoverRetryBackOff(time.Millisecond),
clustermanager.WithMoverApplyClusterTimeout(time.Millisecond),
)
err := m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient)

tt.Expect(err).To(gomega.MatchError(gomega.ContainSubstring("reading cluster from source")))
}

func TestMoverFailGetChildren(t *testing.T) {
tt := newMoverTest(t)
objs := []kubernetes.Object{tt.spec.Cluster}
tt.buildClients(objs, nil)
m := clustermanager.NewMover(tt.log, tt.clientFactory,
clustermanager.WithMoverRetryBackOff(time.Millisecond),
clustermanager.WithMoverApplyClusterTimeout(time.Millisecond),
)

err := m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient)
tt.Expect(err).To(gomega.MatchError(gomega.ContainSubstring("reading child object")))
}

func TestMoverAlreadyMoved(t *testing.T) {
tt := newMoverTest(t)
objs := tt.spec.ClusterAndChildren()
tt.buildClients(objs, objs)
m := clustermanager.NewMover(tt.log, tt.clientFactory,
clustermanager.WithMoverRetryBackOff(time.Millisecond),
clustermanager.WithMoverApplyClusterTimeout(time.Millisecond),
)

err := m.Move(tt.ctx, tt.spec, tt.fromClient, tt.toClient)
tt.Expect(err).To(gomega.Succeed())

for _, obj := range objs {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
tt.Expect(tt.toClient.Get(tt.ctx, obj.GetName(), obj.GetNamespace(), u)).To(gomega.Succeed())
original, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
tt.Expect(err).To(gomega.Succeed())
// the entire object including metadata/status should be equal if the object already exists in dst
tt.Expect(u.Object).To(gomega.BeComparableTo(original))
}
}
21 changes: 21 additions & 0 deletions pkg/dependencies/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type Dependencies struct {
EksaInstaller *clustermanager.EKSAInstaller
DeleteClusterDefaulter cli.DeleteClusterDefaulter
ClusterDeleter clustermanager.Deleter
ClusterMover *clustermanager.Mover
}

// KubeClients defines super struct that exposes all behavior.
Expand Down Expand Up @@ -1216,6 +1217,26 @@ func (f *Factory) WithClusterDeleter() *Factory {
return f
}

// WithClusterMover builds a cluster mover.
func (f *Factory) WithClusterMover() *Factory {
f.WithLogger().WithUnAuthKubeClient().WithLogger()

f.buildSteps = append(f.buildSteps, func(_ context.Context) error {
var opts []clustermanager.MoverOpt
if f.config.noTimeouts {
opts = append(opts, clustermanager.WithMoverNoTimeouts())
}

f.dependencies.ClusterMover = clustermanager.NewMover(
f.dependencies.Logger,
f.dependencies.UnAuthKubeClient,
opts...,
)
return nil
})
return f
}

// WithValidatorClients builds KubeClients.
func (f *Factory) WithValidatorClients() *Factory {
f.WithKubectl().WithUnAuthKubeClient()
Expand Down
12 changes: 12 additions & 0 deletions pkg/dependencies/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,18 @@ func TestFactoryBuildWithClusterDeleterNoTimeout(t *testing.T) {
tt.Expect(deps.ClusterApplier).NotTo(BeNil())
}

func TestFactoryBuildWithClusterMoverNoTimeout(t *testing.T) {
tt := newTest(t, vsphere)
deps, err := dependencies.NewFactory().
WithLocalExecutables().
WithNoTimeouts().
WithClusterMover().
Build(context.Background())

tt.Expect(err).To(BeNil())
tt.Expect(deps.ClusterApplier).NotTo(BeNil())
}

func TestFactoryBuildWithAwsIamAuthNoTimeout(t *testing.T) {
tt := newTest(t, vsphere)
deps, err := dependencies.NewFactory().
Expand Down

0 comments on commit 5f7b94d

Please sign in to comment.