Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run migration logic with leader election #31

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions cmd/ipam-controller/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,25 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio
os.Exit(1)
}

if err := migrator.Migrate(ctx, k8sClient, opts.IPPoolsNamespace); err != nil {
logger.Error(err, fmt.Sprintf("failed to migrate NV-IPAM config from ConfigMap, "+
"set %s env variable to disable migration", migrator.EnvDisableMigration))
return err
migrationChan := make(chan struct{})
m := migrator.Migrator{
IPPoolsNamespace: opts.IPPoolsNamespace,
K8sClient: k8sClient,
MigrationCh: migrationChan,
LeaderElection: opts.EnableLeaderElection,
Logger: logger.WithName("Migrator"),
}
err = mgr.Add(&m)
if err != nil {
logger.Error(err, "failed to add Migrator to the Manager")
os.Exit(1)
}

nodeEventCH := make(chan event.GenericEvent, 1)

if err = (&nodectrl.NodeReconciler{
NodeEventCh: nodeEventCH,
MigrationCh: migrationChan,
PoolsNamespace: opts.IPPoolsNamespace,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -158,6 +167,7 @@ func RunController(ctx context.Context, config *rest.Config, opts *options.Optio
PoolsNamespace: opts.IPPoolsNamespace,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MigrationCh: migrationChan,
}).SetupWithManager(mgr); err != nil {
logger.Error(err, "unable to create controller", "controller", "IPPool")
return err
Expand Down
8 changes: 5 additions & 3 deletions cmd/ipam-controller/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ var _ = Describe("App", func() {

go func() {
Expect(app.RunController(logr.NewContext(ctrlCtx, klog.NewKlogr()), cfg, &options.Options{
MetricsAddr: "0", // disable
ProbeAddr: "0", // disable
IPPoolsNamespace: TestNamespace,
MetricsAddr: "0", // disable
ProbeAddr: "0", // disable
IPPoolsNamespace: TestNamespace,
EnableLeaderElection: true,
LeaderElectionNamespace: TestNamespace,
})).NotTo(HaveOccurred())
close(controllerStopped)
}()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ipam-controller/controllers/ippool/ippool.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ import (
type IPPoolReconciler struct {
PoolsNamespace string
NodeEventCh chan event.GenericEvent
MigrationCh chan struct{}
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
}

// Reconcile contains logic to sync IPPool objects
func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLog := log.FromContext(ctx)
if req.Namespace != r.PoolsNamespace {
// this should never happen because of the watcher configuration of the manager from controller-runtime pkg
Expand Down
7 changes: 7 additions & 0 deletions pkg/ipam-controller/controllers/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package controllers

import (
"context"
"fmt"

apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,12 +34,18 @@ import (
type NodeReconciler struct {
PoolsNamespace string
NodeEventCh chan event.GenericEvent
MigrationCh chan struct{}
client.Client
Scheme *runtime.Scheme
}

// Reconcile contains logic to sync Node objects
func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
select {
case <-r.MigrationCh:
case <-ctx.Done():
return ctrl.Result{}, fmt.Errorf("canceled")
}
reqLog := log.FromContext(ctx)
reqLog.Info("Notification on Node", "name", req.Name)
node := &corev1.Node{}
Expand Down
29 changes: 27 additions & 2 deletions pkg/ipam-controller/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,35 @@ const (
DefaultConfigMapName = "nvidia-k8s-ipam-config"
)

// Migrator migrate from CM config to IPPool CR
type Migrator struct {
K8sClient client.Client
IPPoolsNamespace string
MigrationCh chan struct{}
LeaderElection bool
Logger logr.Logger
}

// Implements manager.Runnable
func (m *Migrator) Start(ctx context.Context) error {
err := Migrate(ctx, m.Logger, m.K8sClient, m.IPPoolsNamespace)
if err != nil {
m.Logger.Error(err, fmt.Sprintf("failed to migrate NV-IPAM config from ConfigMap, "+
"set %s env variable to disable migration", EnvDisableMigration))
return err
}
close(m.MigrationCh)
return nil
}

// Implements manager.NeedLeaderElection
func (m *Migrator) NeedLeaderElection() bool {
return m.LeaderElection
}

// Migrate reads the ConfigMap with the IPAM configuration, reads the allocations
// from the Nodes annotation, create IPPool CRs and delete the ConfigMap and annotations
func Migrate(ctx context.Context, c client.Client, poolNamespace string) error {
logger := logr.FromContextOrDiscard(ctx).WithName("migrator")
func Migrate(ctx context.Context, logger logr.Logger, c client.Client, poolNamespace string) error {
if os.Getenv(EnvDisableMigration) != "" {
logger.Info(fmt.Sprintf("%s set, skip controller migration", EnvDisableMigration))
return nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/ipam-controller/migrator/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-controller/config"
Expand Down Expand Up @@ -145,7 +146,7 @@ var _ = Describe("Controller Migrator", func() {
Expect(updateNode(node2))

By("Run migrator")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).NotTo(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).NotTo(HaveOccurred())

By("Verify Pool1 Spec")
pool1 := &ipamv1alpha1.IPPool{}
Expand Down Expand Up @@ -183,7 +184,7 @@ var _ = Describe("Controller Migrator", func() {

It("No ConfigMap", func() {
By("Run migrator")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).NotTo(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).NotTo(HaveOccurred())
})

Context("Negative flows", func() {
Expand All @@ -197,12 +198,12 @@ var _ = Describe("Controller Migrator", func() {
}
Expect(k8sClient.Create(ctx, cm)).NotTo(HaveOccurred())
By("Run migrator - should fail")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())

By("Create invalid cfg - not a json data")
updateConfigMap("{{")
By("Run migrator - should fail")
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())

By("Create invalid cfg - Gateway not in subnet")
var inValidConfig = `
Expand All @@ -212,7 +213,7 @@ var _ = Describe("Controller Migrator", func() {
}
}`
updateConfigMap(inValidConfig)
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())

By("Create valid cfg - IPPool exists with different spec")
updateConfigMap(validConfig)
Expand All @@ -228,7 +229,7 @@ var _ = Describe("Controller Migrator", func() {
},
}
Expect(k8sClient.Create(ctx, pool1)).NotTo(HaveOccurred())
Expect(migrator.Migrate(ctx, k8sClient, TestNamespace)).To(HaveOccurred())
Expect(migrator.Migrate(ctx, klog.NewKlogr(), k8sClient, TestNamespace)).To(HaveOccurred())
})
})
})