Skip to content

Commit

Permalink
fix: don't use CancellationToken for kafka message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jvandaal committed Dec 5, 2024
1 parent c32fd65 commit 47482e2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
2 changes: 0 additions & 2 deletions src/ParcelRegistry.Consumer.Address.Console/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

"GroupSuffix": "",
"AddressTopic": "dev.address",
"AddressTopicOffset": "",
"IgnoreAddressTopicOffsetDataCheck": false,

"BaseUrl": "https://api.staging-basisregisters.vlaanderen/",

Expand Down
24 changes: 16 additions & 8 deletions src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) =>
{
_logger.LogInformation("Handling next message");

await commandHandlingProjector.ProjectAsync(commandHandler, message, stoppingToken).ConfigureAwait(false);
await backOfficeProjector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false);

//CancellationToken.None to prevent halfway consumption
await context.SaveChangesAsync(CancellationToken.None);

await ConsumeHandler(commandHandlingProjector, backOfficeProjector, commandHandler, message, context);
}, stoppingToken);
}
catch (Exception)
Expand All @@ -73,5 +66,20 @@ await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context)
throw;
}
}

private async Task ConsumeHandler(
ConnectedProjector<CommandHandler> commandHandlingProjector,
ConnectedProjector<ConsumerAddressContext> backOfficeProjector,
CommandHandler commandHandler,
object message,
ConsumerAddressContext context)
{
_logger.LogInformation("Handling next message");

await commandHandlingProjector.ProjectAsync(commandHandler, message, CancellationToken.None).ConfigureAwait(false);
await backOfficeProjector.ProjectAsync(context, message, CancellationToken.None).ConfigureAwait(false);

await context.SaveChangesAsync(CancellationToken.None);
}
}
}

0 comments on commit 47482e2

Please sign in to comment.