From e9fbd3004c3c9520e3a16521cf312d4cf9dfea08 Mon Sep 17 00:00:00 2001 From: Fred Rolland Date: Mon, 18 Sep 2023 13:45:06 +0300 Subject: [PATCH] Run migration logic with leader election - Add Controller Migrator to the Manager - Add a channel to block reconcile until Migrator finishes Signed-off-by: Fred Rolland --- cmd/ipam-controller/app/app.go | 18 +++++++++--- cmd/ipam-controller/app/app_test.go | 8 +++-- .../controllers/ippool/ippool.go | 6 ++++ pkg/ipam-controller/controllers/node/node.go | 7 +++++ pkg/ipam-controller/migrator/migrator.go | 29 +++++++++++++++++-- pkg/ipam-controller/migrator/migrator_test.go | 13 +++++---- 6 files changed, 66 insertions(+), 15 deletions(-) diff --git a/cmd/ipam-controller/app/app.go b/cmd/ipam-controller/app/app.go index fc9a7fa..611f298 100644 --- a/cmd/ipam-controller/app/app.go +++ b/cmd/ipam-controller/app/app.go @@ -135,16 +135,25 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio os.Exit(1) } - if err := migrator.Migrate(ctx, k8sClient, opts.IPPoolsNamespace); err != nil { - logger.Error(err, fmt.Sprintf("failed to migrate NV-IPAM config from ConfigMap, "+ - "set %s env variable to disable migration", migrator.EnvDisableMigration)) - return err + migrationChan := make(chan struct{}) + m := migrator.Migrator{ + IPPoolsNamespace: opts.IPPoolsNamespace, + K8sClient: k8sClient, + MigrationCh: migrationChan, + LeaderElection: opts.EnableLeaderElection, + Logger: logger.WithName("Migrator"), + } + err = mgr.Add(m) + if err != nil { + logger.Error(err, "failed to add Migrator to the Manager") + os.Exit(1) } nodeEventCH := make(chan event.GenericEvent, 1) if err = (&nodectrl.NodeReconciler{ NodeEventCh: nodeEventCH, + MigrationCh: migrationChan, PoolsNamespace: opts.IPPoolsNamespace, Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -158,6 +167,7 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio PoolsNamespace: opts.IPPoolsNamespace, Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + MigrationCh: migrationChan, }).SetupWithManager(mgr); err != nil { logger.Error(err, "unable to create controller", "controller", "IPPool") return err diff --git a/cmd/ipam-controller/app/app_test.go b/cmd/ipam-controller/app/app_test.go index f606ea8..b52aab7 100644 --- a/cmd/ipam-controller/app/app_test.go +++ b/cmd/ipam-controller/app/app_test.go @@ -171,9 +171,11 @@ var _ = Describe("App", func() { go func() { Expect(app.RunController(logr.NewContext(ctrlCtx, klog.NewKlogr()), cfg, &options.Options{ - MetricsAddr: "0", // disable - ProbeAddr: "0", // disable - IPPoolsNamespace: TestNamespace, + MetricsAddr: "0", // disable + ProbeAddr: "0", // disable + IPPoolsNamespace: TestNamespace, + EnableLeaderElection: true, + LeaderElectionNamespace: TestNamespace, })).NotTo(HaveOccurred()) close(controllerStopped) }() diff --git a/pkg/ipam-controller/controllers/ippool/ippool.go b/pkg/ipam-controller/controllers/ippool/ippool.go index b889fef..9221ba0 100644 --- a/pkg/ipam-controller/controllers/ippool/ippool.go +++ b/pkg/ipam-controller/controllers/ippool/ippool.go @@ -46,6 +46,7 @@ import ( type IPPoolReconciler struct { PoolsNamespace string NodeEventCh chan event.GenericEvent + MigrationCh chan struct{} client.Client Scheme *runtime.Scheme recorder record.EventRecorder @@ -53,6 +54,11 @@ type IPPoolReconciler struct { // Reconcile contains logic to sync IPPool objects func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + select { + case <-r.MigrationCh: + case <-ctx.Done(): + return ctrl.Result{}, fmt.Errorf("canceled") + } reqLog := log.FromContext(ctx) if req.Namespace != r.PoolsNamespace { // this should never happen because of the watcher configuration of the manager from controller-runtime pkg diff --git a/pkg/ipam-controller/controllers/node/node.go b/pkg/ipam-controller/controllers/node/node.go index 495f092..ade513f 100644 --- a/pkg/ipam-controller/controllers/node/node.go +++ b/pkg/ipam-controller/controllers/node/node.go @@ -15,6 +15,7 @@ package controllers import ( "context" + "fmt" apiErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,12 +34,18 @@ import ( type NodeReconciler struct { PoolsNamespace string NodeEventCh chan event.GenericEvent + MigrationCh chan struct{} client.Client Scheme *runtime.Scheme } // Reconcile contains logic to sync Node objects func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + select { + case <-r.MigrationCh: + case <-ctx.Done(): + return ctrl.Result{}, fmt.Errorf("canceled") + } reqLog := log.FromContext(ctx) reqLog.Info("Notification on Node", "name", req.Name) node := &corev1.Node{} diff --git a/pkg/ipam-controller/migrator/migrator.go b/pkg/ipam-controller/migrator/migrator.go index 34faa97..30ea34f 100644 --- a/pkg/ipam-controller/migrator/migrator.go +++ b/pkg/ipam-controller/migrator/migrator.go @@ -50,10 +50,35 @@ const ( DefaultConfigMapName = "nvidia-k8s-ipam-config" ) +// Migrator migrate from CM config to IPPool CR +type Migrator struct { + K8sClient client.Client + IPPoolsNamespace string + MigrationCh chan struct{} + LeaderElection bool + Logger logr.Logger +} + +// Implements manager.Runnable +func (m Migrator) Start(ctx context.Context) error { + err := Migrate(ctx, m.Logger, m.K8sClient, m.IPPoolsNamespace) + if err != nil { + m.Logger.Error(err, fmt.Sprintf("failed to migrate NV-IPAM config from ConfigMap, "+ + "set %s env variable to disable migration", EnvDisableMigration)) + return err + } + close(m.MigrationCh) + return nil +} + +// Implements manager.NeedLeaderElection +func (m Migrator) NeedLeaderElection() bool { + return m.LeaderElection +} + // Migrate reads the ConfigMap with the IPAM configuration, reads the allocations // from the Nodes annotation, create IPPool CRs and delete the ConfigMap and annotations -func Migrate(ctx context.Context, c client.Client, poolNamespace string) error { - logger := logr.FromContextOrDiscard(ctx).WithName("migrator") +func Migrate(ctx context.Context, logger logr.Logger, c client.Client, poolNamespace string) error { if os.Getenv(EnvDisableMigration) != "" { logger.Info(fmt.Sprintf("%s set, skip controller migration", EnvDisableMigration)) return nil diff --git a/pkg/ipam-controller/migrator/migrator_test.go b/pkg/ipam-controller/migrator/migrator_test.go index ce6b85a..e1bac62 100644 --- a/pkg/ipam-controller/migrator/migrator_test.go +++ b/pkg/ipam-controller/migrator/migrator_test.go @@ -23,6 +23,7 @@ import ( apiErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/config" @@ -145,7 +146,7 @@ var _ = Describe("Controller Migrator", func() { Expect(updateNode(node2)) By("Run migrator") - Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).NotTo(HaveOccurred()) + Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).NotTo(HaveOccurred()) By("Verify Pool1 Spec") pool1 := &ipamv1alpha1.IPPool{} @@ -183,7 +184,7 @@ var _ = Describe("Controller Migrator", func() { It("No ConfigMap", func() { By("Run migrator") - Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).NotTo(HaveOccurred()) + Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).NotTo(HaveOccurred()) }) Context("Negative flows", func() { @@ -197,12 +198,12 @@ var _ = Describe("Controller Migrator", func() { } Expect(k8sClient.Create(ctx, cm)).NotTo(HaveOccurred()) By("Run migrator - should fail") - Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred()) + Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred()) By("Create invalid cfg - not a json data") updateConfigMap("{{") By("Run migrator - should fail") - Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred()) + Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred()) By("Create invalid cfg - Gateway not in subnet") var inValidConfig = ` @@ -212,7 +213,7 @@ var _ = Describe("Controller Migrator", func() { } }` updateConfigMap(inValidConfig) - Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred()) + Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred()) By("Create valid cfg - IPPool exists with different spec") updateConfigMap(validConfig) @@ -228,7 +229,7 @@ var _ = Describe("Controller Migrator", func() { }, } Expect(k8sClient.Create(ctx, pool1)).NotTo(HaveOccurred()) - Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred()) + Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred()) }) }) })