From d704d65dcac8f79cf6f797cc0d54eb82216d2546 Mon Sep 17 00:00:00 2001 From: Arne Dumarey Date: Mon, 19 Aug 2024 16:22:32 +0200 Subject: [PATCH] feat(consumer): add offset as projection state to read consumer --- paket.dependencies | 10 +++++----- paket.lock | 10 +++++----- .../ConsumerPostal.cs | 3 ++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/paket.dependencies b/paket.dependencies index c919bac71..18eb3672b 100644 --- a/paket.dependencies +++ b/paket.dependencies @@ -84,11 +84,11 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.14.0 nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.14.0 nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.0 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.0.1 -nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.0.1 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.1.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 5.1.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.1.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.1.0 +nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.1.0 nuget Be.Vlaanderen.Basisregisters.Shaperon 10.0.2 diff --git a/paket.lock b/paket.lock index d9514ae7d..705695480 100644 --- a/paket.lock +++ b/paket.lock @@ -296,24 +296,24 @@ NUGET Be.Vlaanderen.Basisregisters.GrAr.Common (21.14) Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14) Microsoft.CSharp (>= 4.7) - Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.1) AWSSDK.Core (>= 3.7.302.15) AWSSDK.SQS (>= 3.7.300.54) Microsoft.Extensions.Logging (>= 8.0) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer (5.1) Confluent.Kafka (>= 2.3) Microsoft.EntityFrameworkCore (>= 8.0.2) Microsoft.Extensions.Logging (>= 8.0) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.1) Confluent.Kafka (>= 2.3) Microsoft.EntityFrameworkCore.SqlServer (>= 8.0.2) Microsoft.Extensions.Logging (>= 8.0) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.1) Confluent.Kafka (>= 2.3) Newtonsoft.Json (>= 13.0.3) - Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.0.1) + Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.1) Confluent.Kafka (>= 2.3) Microsoft.Extensions.Logging (>= 8.0) Be.Vlaanderen.Basisregisters.Middleware.AddProblemJsonHeader (3.0) diff --git a/src/StreetNameRegistry.Consumer.Read.Postal/ConsumerPostal.cs b/src/StreetNameRegistry.Consumer.Read.Postal/ConsumerPostal.cs index 462fb94e9..afa84765c 100644 --- a/src/StreetNameRegistry.Consumer.Read.Postal/ConsumerPostal.cs +++ b/src/StreetNameRegistry.Consumer.Read.Postal/ConsumerPostal.cs @@ -37,7 +37,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) try { - await _consumer.ConsumeContinuously(async message => + await _consumer.ConsumeContinuously(async (message, messageContext) => { _logger.LogInformation("Handling next message"); @@ -45,6 +45,7 @@ await _consumer.ConsumeContinuously(async message => await projector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false); //CancellationToken.None to prevent halfway consumption + await context.UpdateProjectionState(typeof(ConsumerPostal).FullName, messageContext.Offset, stoppingToken); await context.SaveChangesAsync(CancellationToken.None); }, stoppingToken);