From 47482e2cb60a1533edbae6bda2f3302ec733619e Mon Sep 17 00:00:00 2001 From: jvandaal Date: Thu, 5 Dec 2024 09:25:12 +0100 Subject: [PATCH] fix: don't use CancellationToken for kafka message handler --- .../appsettings.json | 2 -- .../BackOfficeConsumer.cs | 24 ++++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/ParcelRegistry.Consumer.Address.Console/appsettings.json b/src/ParcelRegistry.Consumer.Address.Console/appsettings.json index 1ce8dbbe..eca3427a 100644 --- a/src/ParcelRegistry.Consumer.Address.Console/appsettings.json +++ b/src/ParcelRegistry.Consumer.Address.Console/appsettings.json @@ -17,8 +17,6 @@ "GroupSuffix": "", "AddressTopic": "dev.address", - "AddressTopicOffset": "", - "IgnoreAddressTopicOffsetDataCheck": false, "BaseUrl": "https://api.staging-basisregisters.vlaanderen/", diff --git a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs index 79641938..5aef93fd 100644 --- a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs +++ b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs @@ -57,14 +57,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) => { - _logger.LogInformation("Handling next message"); - - await commandHandlingProjector.ProjectAsync(commandHandler, message, stoppingToken).ConfigureAwait(false); - await backOfficeProjector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false); - - //CancellationToken.None to prevent halfway consumption - await context.SaveChangesAsync(CancellationToken.None); - + await ConsumeHandler(commandHandlingProjector, backOfficeProjector, commandHandler, message, context); }, stoppingToken); } catch (Exception) @@ -73,5 +66,20 @@ await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) throw; } } + + private async Task ConsumeHandler( + ConnectedProjector commandHandlingProjector, + ConnectedProjector backOfficeProjector, + CommandHandler commandHandler, + object message, + ConsumerAddressContext context) + { + _logger.LogInformation("Handling next message"); + + await commandHandlingProjector.ProjectAsync(commandHandler, message, CancellationToken.None).ConfigureAwait(false); + await backOfficeProjector.ProjectAsync(context, message, CancellationToken.None).ConfigureAwait(false); + + await context.SaveChangesAsync(CancellationToken.None); + } } }