Skip to content

Commit

Permalink
fix(watcher): queues not polling when status updated (#712)
Browse files Browse the repository at this point in the history
With reference to
#707

This fixes the behaviour of requeues

Co-authored-by: Christoph Bühler <[email protected]>
  • Loading branch information
slacki123 and buehler committed Feb 13, 2024
1 parent fce185e commit 80132ea
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 15 deletions.
39 changes: 24 additions & 15 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ private async void OnEntityRequeue(object? sender, (string Name, string? Namespa
return;
}

_entityCache.TryRemove(entity.Uid(), out _);
await ReconcileModification(entity);
}

Expand Down Expand Up @@ -173,16 +172,32 @@ private async void OnEvent(WatchEventType type, TEntity entity)
entity.Name(),
_lastResourceVersion);

_queue.RemoveIfQueued(entity);

try
{
switch (type)
{
case WatchEventType.Added or WatchEventType.Modified:
case WatchEventType.Added:
_entityCache.TryAdd(entity.Uid(), entity.Generation() ?? 0);
await ReconcileModification(entity);
break;
case WatchEventType.Modified:
switch (entity)
{
case { Metadata.DeletionTimestamp: null }:
_entityCache.TryGetValue(entity.Uid(), out var cachedGeneration);

// Check if entity spec has changed through "Generation" value increment. Skip reconcile if not changed.
if (entity.Generation() <= cachedGeneration)
{
_logger.LogDebug(
"""Entity "{kind}/{name}" modification did not modify generation. Skip event.""",
entity.Kind,
entity.Name());
return;
}

// update cached generation since generation now changed
_entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, cachedGeneration);
await ReconcileModification(entity);
break;
case { Metadata: { DeletionTimestamp: not null, Finalizers.Count: > 0 } }:
Expand Down Expand Up @@ -216,31 +231,25 @@ private async void OnEvent(WatchEventType type, TEntity entity)

private async Task ReconcileModification(TEntity entity)
{
var latestGeneration = _entityCache.GetOrAdd(entity.Uid(), 0);
if (entity.Generation() <= latestGeneration)
{
_logger.LogDebug(
"""Entity "{kind}/{name}" modification did not modify generation. Skip event.""",
entity.Kind,
entity.Name());
return;
}

_entityCache.TryUpdate(entity.Uid(), entity.Generation() ?? 1, latestGeneration);
// Re-queue should requested in the controller reconcile method. Invalidate any existing queues.
_queue.RemoveIfQueued(entity);
await using var scope = _provider.CreateAsyncScope();
var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>();
await controller.ReconcileAsync(entity);
}

private async Task ReconcileDeletion(TEntity entity)
{
_queue.RemoveIfQueued(entity);
_entityCache.TryRemove(entity.Uid(), out _);
await using var scope = _provider.CreateAsyncScope();
var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>();
await controller.DeletedAsync(entity);
}

private async Task ReconcileFinalizer(TEntity entity)
{
_queue.RemoveIfQueued(entity);
var pendingFinalizer = entity.Finalizers();
if (_finalizers.Value.Find(reg =>
reg.EntityType == entity.GetType() && pendingFinalizer.Contains(reg.Identifier)) is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ public async Task Should_Cancel_Requeue_If_New_Event_Fires()
Services.GetRequiredService<TimedEntityQueue<V1OperatorIntegrationTestEntity>>().Count.Should().Be(0);
}

[Fact]
public async Task Should_Not_Affect_Queues_If_Only_Status_Updated()
{
_mock.TargetInvocationCount = 1;
var result = await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity", "username", _ns.Namespace));
result.Status.Status = "changed";
await _client.UpdateStatusAsync(result);
await _mock.WaitForInvocations;

_mock.Invocations.Count.Should().Be(1);
Services.GetRequiredService<TimedEntityQueue<V1OperatorIntegrationTestEntity>>().Count.Should().Be(1);

}

public override async Task InitializeAsync()
{
await base.InitializeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,27 @@ void Check(int idx, string username)
}
}

[Fact]
public async Task Should_Not_Call_Reconcile_When_Only_Entity_Status_Changed()
{
_mock.TargetInvocationCount = 1;

var result =
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity", "username", _ns.Namespace));
result.Status.Status = "changed";
// Update or UpdateStatus do not call Reconcile
await _client.UpdateAsync(result);
await _client.UpdateStatusAsync(result);
await _mock.WaitForInvocations;

_mock.Invocations.Count.Should().Be(1);

(string method, V1OperatorIntegrationTestEntity entity) = _mock.Invocations.Single();
method.Should().Be("ReconcileAsync");
entity.Should().BeOfType<V1OperatorIntegrationTestEntity>();
entity.Spec.Username.Should().Be("username");
}

[Fact]
public async Task Should_Call_Delete_For_Deleted_Entity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ public async Task Should_Requeue_Entity_And_Reconcile()
_mock.Invocations.Count.Should().Be(5);
}

[Fact]
public async Task Should_Separately_And_Reliably_Requeue_And_Reconcile_Multiple_Entities_In_Parallel()
{
_mock.TargetInvocationCount = 100;
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity1", "username", _ns.Namespace));
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity2", "username", _ns.Namespace));
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity3", "username", _ns.Namespace));
await _client.CreateAsync(new V1OperatorIntegrationTestEntity("test-entity4", "username", _ns.Namespace));
await _mock.WaitForInvocations;

// Expecting invocations, but since in parallel, there is a possibility to for target hit while other are in flight.
_mock.Invocations.Count.Should().BeGreaterOrEqualTo(100).And.BeLessThan(105);
var invocationsGroupedById = _mock.Invocations.GroupBy(item => item.Entity.Metadata.Uid).ToList();
invocationsGroupedById.Count.Should().Be(4);
var invocationDistributions = invocationsGroupedById
.Select(g => (double)g.Count() / _mock.Invocations.Count * 100)
.ToList();
invocationDistributions
.All(p => p is >= 15 and <= 35) // Check that invocations are reasonably distributed
.Should()
.BeTrue($"each entity invocation proportion should be within the specified range of total invocations, " +
$"but instead the distributions were: '{string.Join(", ", invocationDistributions)}'");
}

public override async Task InitializeAsync()
{
await base.InitializeAsync();
Expand Down

0 comments on commit 80132ea

Please sign in to comment.