From 80132ea63f5dd98f8c64283752452ece395512ce Mon Sep 17 00:00:00 2001 From: slacki123 <41322879+slacki123@users.noreply.github.com> Date: Mon, 12 Feb 2024 12:15:24 +0000 Subject: [PATCH] fix(watcher): queues not polling when status updated (#712) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With reference to https://github.com/buehler/dotnet-operator-sdk/issues/707 This fixes the behaviour of requeues Co-authored-by: Christoph Bühler --- .../Watcher/ResourceWatcher{TEntity}.cs | 39 ++++++++++++------- .../CancelEntityRequeue.Integration.Test.cs | 14 +++++++ .../EntityController.Integration.Test.cs | 21 ++++++++++ .../EntityRequeue.Integration.Test.cs | 24 ++++++++++++ 4 files changed, 83 insertions(+), 15 deletions(-) diff --git a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs index 72822b3e..2a517700 100644 --- a/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs @@ -111,7 +111,6 @@ private async void OnEntityRequeue(object? sender, (string Name, string? Namespa return; } - _entityCache.TryRemove(entity.Uid(), out _); await ReconcileModification(entity); } @@ -173,16 +172,32 @@ private async void OnEvent(WatchEventType type, TEntity entity) entity.Name(), _lastResourceVersion); - _queue.RemoveIfQueued(entity); - try { switch (type) { - case WatchEventType.Added or WatchEventType.Modified: + case WatchEventType.Added: + _entityCache.TryAdd(entity.Uid(), entity.Generation() ?? 0); + await ReconcileModification(entity); + break; + case WatchEventType.Modified: switch (entity) { case { Metadata.DeletionTimestamp: null }: + _entityCache.TryGetValue(entity.Uid(), out var cachedGeneration); + + // Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed. + if (entity.Generation() <= cachedGeneration) + { + _logger.LogDebug( + """Entity "{kind}/{name}" modification did not modify generation. Skip event.""", + entity.Kind, + entity.Name()); + return; + } + + // update cached generation since generation now changed + _entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, cachedGeneration); await ReconcileModification(entity); break; case { Metadata: { DeletionTimestamp: not null, Finalizers.Count: > 0 } }: @@ -216,17 +231,8 @@ private async void OnEvent(WatchEventType type, TEntity entity) private async Task ReconcileModification(TEntity entity) { - var latestGeneration = _entityCache.GetOrAdd(entity.Uid(), 0); - if (entity.Generation() <= latestGeneration) - { - _logger.LogDebug( - """Entity "{kind}/{name}" modification did not modify generation. Skip event.""", - entity.Kind, - entity.Name()); - return; - } - - _entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, latestGeneration); + // Re-queue should requested in the controller reconcile method. Invalidate any existing queues. + _queue.RemoveIfQueued(entity); await using var scope = _provider.CreateAsyncScope(); var controller = scope.ServiceProvider.GetRequiredService>(); await controller.ReconcileAsync(entity); @@ -234,6 +240,8 @@ private async Task ReconcileModification(TEntity entity) private async Task ReconcileDeletion(TEntity entity) { + _queue.RemoveIfQueued(entity); + _entityCache.TryRemove(entity.Uid(), out _); await using var scope = _provider.CreateAsyncScope(); var controller = scope.ServiceProvider.GetRequiredService>(); await controller.DeletedAsync(entity); @@ -241,6 +249,7 @@ private async Task ReconcileDeletion(TEntity entity) private async Task ReconcileFinalizer(TEntity entity) { + _queue.RemoveIfQueued(entity); var pendingFinalizer = entity.Finalizers(); if (_finalizers.Value.Find(reg => reg.EntityType == entity.GetType() && pendingFinalizer.Contains(reg.Identifier)) is not diff --git a/test/KubeOps.Operator.Test/Controller/CancelEntityRequeue.Integration.Test.cs b/test/KubeOps.Operator.Test/Controller/CancelEntityRequeue.Integration.Test.cs index f26bb8d1..48a61c3f 100644 --- a/test/KubeOps.Operator.Test/Controller/CancelEntityRequeue.Integration.Test.cs +++ b/test/KubeOps.Operator.Test/Controller/CancelEntityRequeue.Integration.Test.cs @@ -34,6 +34,20 @@ public async Task Should_Cancel_Requeue_If_New_Event_Fires() Services.GetRequiredService>().Count.Should().Be(0); } + [Fact] + public async Task Should_Not_Affect_Queues_If_Only_Status_Updated() + { + _mock.TargetInvocationCount = 1; + var result = await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity", "username", _ns.Namespace)); + result.Status.Status = "changed"; + await _client.UpdateStatusAsync(result); + await _mock.WaitForInvocations; + + _mock.Invocations.Count.Should().Be(1); + Services.GetRequiredService>().Count.Should().Be(1); + + } + public override async Task InitializeAsync() { await base.InitializeAsync(); diff --git a/test/KubeOps.Operator.Test/Controller/EntityController.Integration.Test.cs b/test/KubeOps.Operator.Test/Controller/EntityController.Integration.Test.cs index b02144d4..93f70905 100644 --- a/test/KubeOps.Operator.Test/Controller/EntityController.Integration.Test.cs +++ b/test/KubeOps.Operator.Test/Controller/EntityController.Integration.Test.cs @@ -57,6 +57,27 @@ void Check(int idx, string username) } } + [Fact] + public async Task Should_Not_Call_Reconcile_When_Only_Entity_Status_Changed() + { + _mock.TargetInvocationCount = 1; + + var result = + await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity", "username", _ns.Namespace)); + result.Status.Status = "changed"; + // Update or UpdateStatus do not call Reconcile + await _client.UpdateAsync(result); + await _client.UpdateStatusAsync(result); + await _mock.WaitForInvocations; + + _mock.Invocations.Count.Should().Be(1); + + (string method, V1OperatorIntegrationTestEntity entity) = _mock.Invocations.Single(); + method.Should().Be("ReconcileAsync"); + entity.Should().BeOfType(); + entity.Spec.Username.Should().Be("username"); + } + [Fact] public async Task Should_Call_Delete_For_Deleted_Entity() { diff --git a/test/KubeOps.Operator.Test/Controller/EntityRequeue.Integration.Test.cs b/test/KubeOps.Operator.Test/Controller/EntityRequeue.Integration.Test.cs index b6cfcc7e..a601f238 100644 --- a/test/KubeOps.Operator.Test/Controller/EntityRequeue.Integration.Test.cs +++ b/test/KubeOps.Operator.Test/Controller/EntityRequeue.Integration.Test.cs @@ -35,6 +35,30 @@ public async Task Should_Requeue_Entity_And_Reconcile() _mock.Invocations.Count.Should().Be(5); } + [Fact] + public async Task Should_Separately_And_Reliably_Requeue_And_Reconcile_Multiple_Entities_In_Parallel() + { + _mock.TargetInvocationCount = 100; + await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity1", "username", _ns.Namespace)); + await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity2", "username", _ns.Namespace)); + await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity3", "username", _ns.Namespace)); + await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity4", "username", _ns.Namespace)); + await _mock.WaitForInvocations; + + // Expecting invocations, but since in parallel, there is a possibility to for target hit while other are in flight. + _mock.Invocations.Count.Should().BeGreaterOrEqualTo(100).And.BeLessThan(105); + var invocationsGroupedById = _mock.Invocations.GroupBy(item => item.Entity.Metadata.Uid).ToList(); + invocationsGroupedById.Count.Should().Be(4); + var invocationDistributions = invocationsGroupedById + .Select(g => (double)g.Count() / _mock.Invocations.Count * 100) + .ToList(); + invocationDistributions + .All(p => p is >= 15 and <= 35) // Check that invocations are reasonably distributed + .Should() + .BeTrue($"each entity invocation proportion should be within the specified range of total invocations, " + + $"but instead the distributions were: '{string.Join(", ", invocationDistributions)}'"); + } + public override async Task InitializeAsync() { await base.InitializeAsync();