Skip to content

Commit

Permalink
Merge pull request #26 from ykulazhenkov/remove-stale-pool
Browse files Browse the repository at this point in the history
ipam-node: Remove stale pool entries from the store
  • Loading branch information
adrianchiris authored Aug 8, 2023
2 parents d45ca3f + 9d0c23e commit 9bd60b3
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/ipam-node/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
}
return
}
c := cleaner.New(mgr.GetClient(), store, time.Minute, 3)
c := cleaner.New(mgr.GetClient(), store, poolManager, time.Minute, 3)
c.Start(innerCtx)
logger.Info("cleaner stopped")
}()
Expand Down
36 changes: 26 additions & 10 deletions pkg/ipam-node/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (

storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
)

// Cleaner is the interface of the cleaner package.
// The cleaner periodically scan the store and check for allocations which doesn't have
// related Pod in the k8s API. If allocation has no Pod for more than X checks, then the cleaner
// will release the allocation.
// will release the allocation. Also, the cleaner will remove pool entries in the store if the pool has no
// allocation and pool configuration is unavailable in the Kubernetes API.
type Cleaner interface {
// Start starts the cleaner loop.
// The cleaner loop discovers stale allocations and clean up them.
Expand All @@ -43,12 +45,13 @@ type Cleaner interface {
// New creates and initialize new cleaner instance
// "checkInterval" defines delay between checks for stale allocations.
// "checkCountBeforeRelease: defines how many check to do before remove the allocation
func New(client client.Client, store storePkg.Store,
func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigReader,
checkInterval time.Duration,
checkCountBeforeRelease int) Cleaner {
return &cleaner{
client: client,
store: store,
poolConfReader: poolConfReader,
checkInterval: checkInterval,
checkCountBeforeRelease: checkCountBeforeRelease,
staleAllocations: make(map[string]int),
Expand All @@ -58,6 +61,7 @@ func New(client client.Client, store storePkg.Store,
type cleaner struct {
client client.Client
store storePkg.Store
poolConfReader pool.ConfigReader
checkInterval time.Duration
checkCountBeforeRelease int
// key is <pool_name>|<container_id>|<interface_name>, value is count of failed checks
Expand All @@ -84,13 +88,19 @@ func (c *cleaner) Start(ctx context.Context) {

func (c *cleaner) loop(ctx context.Context) error {
logger := logr.FromContextOrDiscard(ctx)
store, err := c.store.Open(ctx)
session, err := c.store.Open(ctx)
if err != nil {
return fmt.Errorf("failed to open store: %v", err)
}
allReservations := map[string]struct{}{}
for _, poolName := range store.ListPools() {
for _, reservation := range store.ListReservations(poolName) {
emptyPools := []string{}
for _, poolName := range session.ListPools() {
poolReservations := session.ListReservations(poolName)
if len(poolReservations) == 0 {
emptyPools = append(emptyPools, poolName)
continue
}
for _, reservation := range poolReservations {
resLogger := logger.WithValues("pool", poolName,
"container_id", reservation.ContainerID, "interface_name", reservation.InterfaceName)
key := c.getStaleAllocKey(poolName, reservation)
Expand All @@ -105,7 +115,7 @@ func (c *cleaner) loop(ctx context.Context) error {
Name: reservation.Metadata.PodName,
}, pod)
if err != nil && !apiErrors.IsNotFound(err) {
store.Cancel()
session.Cancel()
return fmt.Errorf("failed to read Pod info from the cache: %v", err)
}
if apiErrors.IsNotFound(err) ||
Expand All @@ -128,13 +138,19 @@ func (c *cleaner) loop(ctx context.Context) error {
// release reservations which were marked as stale multiple times
if count > c.checkCountBeforeRelease {
keyFields := strings.SplitN(k, "|", 3)
pool, containerID, ifName := keyFields[0], keyFields[1], keyFields[2]
logger.Info("stale reservation released", "pool", pool,
poolName, containerID, ifName := keyFields[0], keyFields[1], keyFields[2]
logger.Info("stale reservation released", "poolName", poolName,
"container_id", containerID, "interface_name", ifName)
store.ReleaseReservationByID(pool, containerID, ifName)
session.ReleaseReservationByID(poolName, containerID, ifName)
}
}
// remove empty pools if they don't have configuration in the k8s API
for _, emptyPool := range emptyPools {
if p := c.poolConfReader.GetPoolByName(emptyPool); p == nil {
session.RemovePool(emptyPool)
}
}
if err := store.Commit(); err != nil {
if err := session.Commit(); err != nil {
return fmt.Errorf("failed to commit changes to the store: %v", err)
}
return nil
Expand Down
42 changes: 29 additions & 13 deletions pkg/ipam-node/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
cleanerPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/cleaner"
storePkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/store"
"github.com/Mellanox/nvidia-k8s-ipam/pkg/ipam-node/types"
poolPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool"
poolMockPkg "github.com/Mellanox/nvidia-k8s-ipam/pkg/pool/mocks"
)

const (
Expand All @@ -35,6 +37,7 @@ const (
testPodName2 = "test-pod2"
testPool1 = "pool1"
testPool2 = "pool2"
testPool3 = "pool3"
testIFName = "net0"
)

Expand All @@ -54,62 +57,75 @@ var _ = Describe("Cleaner", func() {
It("Cleanup test", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
defer close(done)
storePath := filepath.Join(GinkgoT().TempDir(), "test_store")
storeMgr := storePkg.New(storePath)
cleaner := cleanerPkg.New(k8sClient, storeMgr, time.Millisecond*100, 3)
store := storePkg.New(storePath)

pod1UID := createPod(testPodName1, testNamespace)
_ = createPod(testPodName2, testNamespace)
poolManager := poolMockPkg.NewManager(GinkgoT())
// pool2 has no config in the k8s API
poolManager.On("GetPoolByName", testPool2).Return(nil)
// pool3 has config in the k8s API
poolManager.On("GetPoolByName", testPool3).Return(&poolPkg.IPPool{})

store, err := storeMgr.Open(ctx)
session, err := store.Open(ctx)
Expect(err).NotTo(HaveOccurred())
// this will create empty pool config
session.SetLastReservedIP(testPool3, net.ParseIP("192.168.33.100"))

cleaner := cleanerPkg.New(k8sClient, store, poolManager, time.Millisecond*100, 3)

pod1UID := createPod(testPodName1, testNamespace)
_ = createPod(testPodName2, testNamespace)

// should keep these reservations
Expect(store.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool1, "id1", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodUUID: pod1UID,
PodName: testPodName1,
PodNamespace: testNamespace,
}, net.ParseIP("192.168.1.100"))).NotTo(HaveOccurred())

Expect(store.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{},
Expect(session.Reserve(testPool1, "id2", testIFName, types.ReservationMetadata{},
net.ParseIP("192.168.1.101"))).NotTo(HaveOccurred())

// should remove these reservations
Expect(store.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool1, "id3", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodName: "unknown",
PodNamespace: testNamespace,
}, net.ParseIP("192.168.1.102"))).NotTo(HaveOccurred())
Expect(store.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool2, "id4", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodName: "unknown2",
PodNamespace: testNamespace,
}, net.ParseIP("192.168.2.100"))).NotTo(HaveOccurred())
Expect(store.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{
Expect(session.Reserve(testPool2, "id5", testIFName, types.ReservationMetadata{
CreateTime: time.Now().Format(time.RFC3339Nano),
PodUUID: "something", // differ from the reservation
PodName: testPodName2,
PodNamespace: testNamespace,
}, net.ParseIP("192.168.2.101"))).NotTo(HaveOccurred())

Expect(store.Commit()).NotTo(HaveOccurred())
Expect(session.Commit()).NotTo(HaveOccurred())

go func() {
cleaner.Start(ctx)
}()
Eventually(func(g Gomega) {
store, err := storeMgr.Open(ctx)
store, err := store.Open(ctx)
g.Expect(err).NotTo(HaveOccurred())
defer store.Cancel()
g.Expect(store.GetReservationByID(testPool1, "id1", testIFName)).NotTo(BeNil())
g.Expect(store.GetReservationByID(testPool1, "id2", testIFName)).NotTo(BeNil())
g.Expect(store.GetReservationByID(testPool1, "id3", testIFName)).To(BeNil())
g.Expect(store.GetReservationByID(testPool2, "id4", testIFName)).To(BeNil())
g.Expect(store.GetReservationByID(testPool2, "id5", testIFName)).To(BeNil())
g.Expect(store.ListPools()).To(And(
ContainElements(testPool1, testPool3),
Not(ContainElement(testPool2))))
}, 10).Should(Succeed())

close(done)
}()
Eventually(done, time.Minute).Should(BeClosed())
})
Expand Down
33 changes: 33 additions & 0 deletions pkg/ipam-node/store/mocks/Session.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/ipam-node/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Session interface {
ListReservations(pool string) []types.Reservation
// ListPools return list with names of all known pools
ListPools() []string
// RemovePool removes information about the pool from the store
RemovePool(pool string)
// GetLastReservedIP returns last reserved IP for the pool or nil
GetLastReservedIP(pool string) net.IP
// SetLastReservedIP set last reserved IP fot the pool
Expand Down Expand Up @@ -256,6 +258,13 @@ func (s *session) ListPools() []string {
return pools
}

// RemovePool is the Session interface implementation for session
func (s *session) RemovePool(pool string) {
s.checkClosed()
delete(s.tmpData.Pools, pool)
s.isModified = true
}

// GetLastReservedIP is the Session interface implementation for session
func (s *session) GetLastReservedIP(pool string) net.IP {
s.checkClosed()
Expand Down
8 changes: 8 additions & 0 deletions pkg/ipam-node/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,12 @@ var _ = Describe("Store", func() {
s.Reserve(testPoolName, "other", testNetIfName,
types.ReservationMetadata{}, net.ParseIP(testIP))).To(MatchError(storePkg.ErrIPAlreadyReserved))
})
It("Remove pool data", func() {
s, err := store.Open(context.Background())
Expect(err).NotTo(HaveOccurred())
createTestReservation(s)
Expect(s.ListReservations(testPoolName)).NotTo(BeEmpty())
s.RemovePool(testPoolName)
Expect(s.ListReservations(testPoolName)).To(BeEmpty())
})
})

0 comments on commit 9bd60b3

Please sign in to comment.