From bfa73918877529cabc48915fb31db575d753c5cb Mon Sep 17 00:00:00 2001 From: Fred Rolland Date: Mon, 14 Aug 2023 11:12:37 +0300 Subject: [PATCH] Node: watch IPPool CR Move from watching Nodes object and read IP range from annotation, to watch IPPools objects and get Allocations from their Status. Signed-off-by: Fred Rolland --- cmd/ipam-node/app/app.go | 28 ++++--- cmd/ipam-node/app/app_suite_test.go | 17 +++- cmd/ipam-node/app/app_test.go | 84 ++++++++++++++----- cmd/ipam-node/app/options/options.go | 29 ++++--- deploy/nv-ipam.yaml | 14 +++- .../{node/node.go => ippool/ippool.go} | 54 ++++++------ pkg/pool/manager.go | 57 +++++-------- pkg/pool/manager_test.go | 14 +--- 8 files changed, 174 insertions(+), 123 deletions(-) rename pkg/ipam-node/controllers/{node/node.go => ippool/ippool.go} (52%) diff --git a/cmd/ipam-node/app/app.go b/cmd/ipam-node/app/app.go index a1b49f1..1b18d09 100644 --- a/cmd/ipam-node/app/app.go +++ b/cmd/ipam-node/app/app.go @@ -31,8 +31,6 @@ import ( "github.com/google/renameio/v2" "github.com/spf13/cobra" "google.golang.org/grpc" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -40,7 +38,6 @@ import ( "k8s.io/component-base/term" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/healthz" // register json format for logger @@ -50,13 +47,14 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth" nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1" + ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1" "github.com/Mellanox/nvidia-k8s-ipam/cmd/ipam-node/app/options" "github.com/Mellanox/nvidia-k8s-ipam/pkg/cmdutils" cniTypes "github.com/Mellanox/nvidia-k8s-ipam/pkg/cni/types" "github.com/Mellanox/nvidia-k8s-ipam/pkg/common" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/allocator" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner" - nodectrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/node" + ippoolctrl "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/controllers/ippool" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/grpc/middleware" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/handlers" "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/migrator" @@ -116,7 +114,8 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio ctrl.SetLogger(logger) logger.Info("start IPAM node daemon", - "version", version.GetVersionString(), "node", opts.NodeName) + "version", version.GetVersionString(), "node", opts.NodeName, + "IPPools Namespace", opts.PoolsNamespace) if err := deployShimCNI(logger, opts); err != nil { return err @@ -129,15 +128,16 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio return err } + if err := ipamv1alpha1.AddToScheme(scheme); err != nil { + logger.Error(err, "failed to register ipamv1alpha1 scheme") + return err + } + poolManager := poolPkg.NewManager() mgr, err := ctrl.NewManager(config, ctrl.Options{ - Scheme: scheme, - NewCache: cache.BuilderWithOptions(cache.Options{ - SelectorsByObject: cache.SelectorsByObject{&corev1.Node{}: cache.ObjectSelector{ - Field: fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", opts.NodeName)), - }}, - }), + Scheme: scheme, + Namespace: opts.PoolsNamespace, MetricsBindAddress: opts.MetricsAddr, Port: 9443, HealthProbeBindAddress: opts.ProbeAddr, @@ -146,12 +146,14 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio logger.Error(err, "unable to initialize manager") return err } - if err = (&nodectrl.NodeReconciler{ + + if err = (&ippoolctrl.IPPoolReconciler{ PoolManager: poolManager, Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + NodeName: opts.NodeName, }).SetupWithManager(mgr); err != nil { - logger.Error(err, "unable to create controller", "controller", "Node") + logger.Error(err, "unable to create controller", "controller", "IPPool") return err } diff --git a/cmd/ipam-node/app/app_suite_test.go b/cmd/ipam-node/app/app_suite_test.go index 7d88376..8de4ff0 100644 --- a/cmd/ipam-node/app/app_suite_test.go +++ b/cmd/ipam-node/app/app_suite_test.go @@ -19,9 +19,12 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" + + ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1" ) var ( @@ -39,17 +42,25 @@ func TestApp(t *testing.T) { var _ = BeforeSuite(func() { By("bootstrapping test environment") - testEnv = &envtest.Environment{} + var err error + err = ipamv1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{"../../../deploy/crds"}, + CRDInstallOptions: envtest.CRDInstallOptions{ + ErrorIfPathMissing: true, + }, + } ctx, cFunc = context.WithCancel(context.Background()) - var err error // cfg is defined in this file globally. cfg, err = testEnv.Start() Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) - k8sClient, err = client.New(cfg, client.Options{}) + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) Expect(err).NotTo(HaveOccurred()) Expect(k8sClient).NotTo(BeNil()) }) diff --git a/cmd/ipam-node/app/app_test.go b/cmd/ipam-node/app/app_test.go index 1457550..cab85f5 100644 --- a/cmd/ipam-node/app/app_test.go +++ b/cmd/ipam-node/app/app_test.go @@ -27,12 +27,13 @@ import ( "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" nodev1 "github.com/Mellanox/nvidia-k8s-ipam/api/grpc/nvidia/ipam/node/v1" + ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1" "github.com/Mellanox/nvidia-k8s-ipam/cmd/ipam-node/app" "github.com/Mellanox/nvidia-k8s-ipam/cmd/ipam-node/app/options" - "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" ) const ( @@ -43,27 +44,67 @@ const ( testNamespace = "default" ) -func createTestNode() *corev1.Node { - nodeObj := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: testNodeName}, - } - ExpectWithOffset(1, pool.SetIPBlockAnnotation(nodeObj, map[string]*pool.IPPool{ - testPoolName1: { - Name: testPoolName1, - Subnet: "192.168.0.0/16", - StartIP: "192.168.0.2", - EndIP: "192.168.0.254", - Gateway: "192.168.0.1", +func createTestPools() { + pool1 := &ipamv1alpha1.IPPool{ + ObjectMeta: metav1.ObjectMeta{Name: testPoolName1, Namespace: testNamespace}, + Spec: ipamv1alpha1.IPPoolSpec{ + Subnet: "192.168.0.0/16", + PerNodeBlockSize: 252, + Gateway: "192.168.0.1", }, - testPoolName2: {Name: testPoolName2, - Subnet: "10.100.0.0/16", - StartIP: "10.100.0.2", - EndIP: "10.100.0.254", - Gateway: "10.100.0.1", + } + ExpectWithOffset(1, k8sClient.Create(ctx, pool1)) + + pool2 := &ipamv1alpha1.IPPool{ + ObjectMeta: metav1.ObjectMeta{Name: testPoolName2, Namespace: testNamespace}, + Spec: ipamv1alpha1.IPPoolSpec{ + Subnet: "10.100.0.0/16", + PerNodeBlockSize: 252, + Gateway: "10.100.0.1", }, - })).NotTo(HaveOccurred()) - ExpectWithOffset(1, k8sClient.Create(ctx, nodeObj)) - return nodeObj + } + ExpectWithOffset(1, k8sClient.Create(ctx, pool2)) + + // Update statuses with range allocation + Eventually(func(g Gomega) error { + status := ipamv1alpha1.IPPoolStatus{ + Allocations: []ipamv1alpha1.Allocation{ + { + NodeName: testNodeName, + StartIP: "192.168.0.2", + EndIP: "192.168.0.254", + }, + }, + } + return updatePoolStatus(testPoolName1, status) + }, 30, 5).Should(Not(HaveOccurred())) + + Eventually(func(g Gomega) error { + status := ipamv1alpha1.IPPoolStatus{ + Allocations: []ipamv1alpha1.Allocation{ + { + NodeName: testNodeName, + StartIP: "10.100.0.2", + EndIP: "10.100.0.254", + }, + }, + } + return updatePoolStatus(testPoolName2, status) + }, 30, 5).Should(Not(HaveOccurred())) +} + +func updatePoolStatus(poolName string, status ipamv1alpha1.IPPoolStatus) error { + pool := &ipamv1alpha1.IPPool{} + err := k8sClient.Get(ctx, types.NamespacedName{Name: poolName, Namespace: testNamespace}, pool) + if err != nil { + return err + } + pool.Status = status + err = k8sClient.Status().Update(ctx, pool) + if err != nil { + return err + } + return nil } func createTestPod() *corev1.Pod { @@ -98,6 +139,7 @@ func getOptions(testDir string) *options.Options { opts.CNIBinDir = cniBinDir opts.CNIConfDir = cniConfDir opts.CNIDaemonSocket = daemonSocket + opts.PoolsNamespace = testNamespace return opts } @@ -122,7 +164,7 @@ var _ = Describe("IPAM Node daemon", func() { testDir := GinkgoT().TempDir() opts := getOptions(testDir) - createTestNode() + createTestPools() pod := createTestPod() ctx = logr.NewContext(ctx, klog.NewKlogr()) diff --git a/cmd/ipam-node/app/options/options.go b/cmd/ipam-node/app/options/options.go index 8e4ed1f..8194262 100644 --- a/cmd/ipam-node/app/options/options.go +++ b/cmd/ipam-node/app/options/options.go @@ -36,12 +36,13 @@ const ( // New initialize and return new Options object func New() *Options { return &Options{ - Options: *cmdoptions.New(), - MetricsAddr: ":8080", - ProbeAddr: ":8081", - NodeName: "", - BindAddress: DefaultBindAddress, - StoreFile: DefaultStoreFile, + Options: *cmdoptions.New(), + MetricsAddr: ":8080", + ProbeAddr: ":8081", + NodeName: "", + BindAddress: DefaultBindAddress, + StoreFile: DefaultStoreFile, + PoolsNamespace: "kube-system", // shim CNI parameters CNIBinDir: "/opt/cni/bin", CNIBinFile: "/nv-ipam", @@ -58,11 +59,12 @@ func New() *Options { // Options holds command line options for controller type Options struct { cmdoptions.Options - MetricsAddr string - ProbeAddr string - NodeName string - BindAddress string - StoreFile string + MetricsAddr string + ProbeAddr string + NodeName string + PoolsNamespace string + BindAddress string + StoreFile string // shim CNI parameters CNIBinDir string CNIBinFile string @@ -91,6 +93,8 @@ func (o *Options) AddNamedFlagSets(sharedFS *cliflag.NamedFlagSets) { o.ProbeAddr, "The address the probe endpoint binds to.") daemonFS.StringVar(&o.NodeName, "node-name", o.NodeName, "The name of the Node on which the daemon runs") + daemonFS.StringVar(&o.PoolsNamespace, "ippools-namespace", + o.PoolsNamespace, "The name of the namespace to watch for IPPools CRs") daemonFS.StringVar(&o.BindAddress, "bind-address", o.BindAddress, "GPRC server bind address. e.g.: tcp://127.0.0.1:9092, unix:///var/lib/foo") daemonFS.StringVar(&o.StoreFile, "store-file", o.StoreFile, @@ -122,6 +126,9 @@ func (o *Options) Validate() error { if len(o.NodeName) == 0 { return fmt.Errorf("node-name is required parameter") } + if len(o.PoolsNamespace) == 0 { + return fmt.Errorf("ippools-namespace is required parameter") + } _, _, err := ParseBindAddress(o.BindAddress) if err != nil { return fmt.Errorf("bind-address is invalid: %v", err) diff --git a/deploy/nv-ipam.yaml b/deploy/nv-ipam.yaml index 21380a7..507300f 100644 --- a/deploy/nv-ipam.yaml +++ b/deploy/nv-ipam.yaml @@ -7,12 +7,19 @@ rules: - apiGroups: - "" resources: - - nodes - pods verbs: - get - list - watch + - apiGroups: + - nv-ipam.nvidia.com + resources: + - ippools + verbs: + - get + - list + - watch --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 @@ -70,6 +77,10 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + - name: IPPOOLS_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace command: [ "/ipam-node" ] args: - --node-name=$(NODE_NAME) @@ -83,6 +94,7 @@ spec: - --cni-conf-dir=/etc/cni/net.d/nv-ipam.d - --cni-log-file=/var/log/nv-ipam-cni.log - --cni-log-level=info # log level for shim CNI + - --ippools-namespace=$(IPPOOLS_NAMESPACE) resources: requests: cpu: "100m" diff --git a/pkg/ipam-node/controllers/node/node.go b/pkg/ipam-node/controllers/ippool/ippool.go similarity index 52% rename from pkg/ipam-node/controllers/node/node.go rename to pkg/ipam-node/controllers/ippool/ippool.go index 13df8d7..49d9ef2 100644 --- a/pkg/ipam-node/controllers/node/node.go +++ b/pkg/ipam-node/controllers/ippool/ippool.go @@ -16,58 +16,58 @@ package controllers import ( "context" - corev1 "k8s.io/api/core/v1" apiErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/predicate" + ipamv1alpha1 "github.com/Mellanox/nvidia-k8s-ipam/api/v1alpha1" "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" ) -// NodeReconciler reconciles Node objects -type NodeReconciler struct { +// IPPoolReconciler reconciles Node objects +type IPPoolReconciler struct { PoolManager pool.Manager client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + NodeName string } -// Reconcile contains logic to sync Node objects -func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +// Reconcile contains logic to sync IPPool objects +func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLog := log.FromContext(ctx) - node := &corev1.Node{} - err := r.Client.Get(ctx, req.NamespacedName, node) + ipPool := &ipamv1alpha1.IPPool{} + err := r.Client.Get(ctx, req.NamespacedName, ipPool) if err != nil { if apiErrors.IsNotFound(err) { + reqLog.Info("Pool not found, removing from PoolManager") + r.PoolManager.RemovePool(req.Name) return ctrl.Result{}, nil } + reqLog.Error(err, "failed to get Pool object from the cache") return ctrl.Result{}, err } - if err := r.PoolManager.Update(node); err != nil { - reqLog.Info("pool config from the node object is not updated, reset pool config", - "reason", err.Error()) - r.PoolManager.Reset() - } else { - reqLog.Info("pools configuration updated", "data", r.PoolManager.GetPools()) + reqLog.Info("Notification on Pool", "name", ipPool.Name) + for _, alloc := range ipPool.Status.Allocations { + if alloc.NodeName == r.NodeName { + pool := &pool.IPPool{ + Name: ipPool.Name, + Subnet: ipPool.Spec.Subnet, + Gateway: ipPool.Spec.Gateway, + StartIP: alloc.StartIP, + EndIP: alloc.EndIP, + } + r.PoolManager.UpdatePool(pool) + break + } } return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. -func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *IPPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&corev1.Node{}). - WithEventFilter(predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - if e.ObjectOld == nil || e.ObjectNew == nil { - return true - } - return e.ObjectOld.GetAnnotations()[pool.IPBlocksAnnotation] != - e.ObjectNew.GetAnnotations()[pool.IPBlocksAnnotation] - }, - }). + For(&ipamv1alpha1.IPPool{}). Complete(r) } diff --git a/pkg/pool/manager.go b/pkg/pool/manager.go index 37ea6fa..995d13c 100644 --- a/pkg/pool/manager.go +++ b/pkg/pool/manager.go @@ -13,70 +13,53 @@ package pool -import ( - "fmt" - "sync" - - corev1 "k8s.io/api/core/v1" -) +import "sync" // Manager provide access to pools configuration // //go:generate mockery --name Manager type Manager interface { ConfigReader - // Update Pool's configs from node object, - // returns an error if node object doesn't contain valid config - Update(node *corev1.Node) error - // Reset clean Pool config which is cached in memory - Reset() + // Update Pool's config from IPPool CR + UpdatePool(pool *IPPool) + // Remove Pool's config + RemovePool(poolName string) } // NewManager create and initialize new manager instance func NewManager() Manager { - return &manager{} + return &manager{ + poolByName: make(map[string]*IPPool), + } } type manager struct { - lock sync.Mutex - reader ConfigReader + lock sync.Mutex + poolByName map[string]*IPPool } -// GetPoolByName is the Manager interface implementation for the manager -func (m *manager) GetPoolByName(name string) *IPPool { +func (m *manager) UpdatePool(pool *IPPool) { m.lock.Lock() defer m.lock.Unlock() - if m.reader == nil { - return nil - } - return m.reader.GetPoolByName(name) + m.poolByName[pool.Name] = pool } -// GetPools is the Manager interface implementation for the manager -func (m *manager) GetPools() map[string]*IPPool { +func (m *manager) RemovePool(poolName string) { m.lock.Lock() defer m.lock.Unlock() - if m.reader == nil { - return nil - } - return m.reader.GetPools() + delete(m.poolByName, poolName) } -// Update is the Manager interface implementation for the manager -func (m *manager) Update(node *corev1.Node) error { +// GetPoolByName is the Manager interface implementation for the manager +func (m *manager) GetPoolByName(name string) *IPPool { m.lock.Lock() defer m.lock.Unlock() - r, err := NewConfigReader(node) - if err != nil { - return fmt.Errorf("failed to update pools configuration from the node object: %v", err) - } - m.reader = r - return nil + return m.poolByName[name] } -// Reset is the Manager interface implementation for the manager -func (m *manager) Reset() { +// GetPools is the Manager interface implementation for the manager +func (m *manager) GetPools() map[string]*IPPool { m.lock.Lock() defer m.lock.Unlock() - m.reader = nil + return m.poolByName } diff --git a/pkg/pool/manager_test.go b/pkg/pool/manager_test.go index 585c316..af5e7b7 100644 --- a/pkg/pool/manager_test.go +++ b/pkg/pool/manager_test.go @@ -17,31 +17,25 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool" ) var _ = Describe("Manager", func() { It("Update pool data", func() { - testPools := make(map[string]*pool.IPPool) testPoolName := "my-pool-1" - testPools[testPoolName] = &pool.IPPool{ - Name: "my-pool-1", + testPool := &pool.IPPool{ + Name: testPoolName, Subnet: "192.168.0.0/16", StartIP: "192.168.0.2", EndIP: "192.168.0.254", Gateway: "192.168.0.1", } - node := &corev1.Node{} - Expect(pool.SetIPBlockAnnotation(node, testPools)).NotTo(HaveOccurred()) - mgr := pool.NewManager() Expect(mgr.GetPoolByName(testPoolName)).To(BeNil()) - Expect(mgr.Update(node)).NotTo(HaveOccurred()) + mgr.UpdatePool(testPool) Expect(mgr.GetPoolByName(testPoolName)).NotTo(BeNil()) Expect(mgr.GetPools()).To(HaveLen(1)) - mgr.Reset() + mgr.RemovePool(testPoolName) Expect(mgr.GetPoolByName(testPoolName)).To(BeNil()) }) })