Skip to content

Commit

Permalink
feat: configure consumer offset without deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
jvandaal committed Dec 5, 2024
1 parent 47482e2 commit 050f236
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 37 deletions.
8 changes: 4 additions & 4 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.1

nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.2.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.2.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 5.2.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 5.2.0

nuget Be.Vlaanderen.Basisregisters.Shaperon 10.0.2

Expand Down
8 changes: 4 additions & 4 deletions paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,19 @@ NUGET
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1)
Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14.1)
Microsoft.CSharp (>= 4.7)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.2)
AWSSDK.Core (>= 3.7.302.15)
AWSSDK.SQS (>= 3.7.300.54)
Microsoft.Extensions.Logging (>= 8.0)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (5.2)
Confluent.Kafka (>= 2.3)
Microsoft.EntityFrameworkCore.SqlServer (>= 8.0.2)
Microsoft.Extensions.Logging (>= 8.0)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer (5.2)
Confluent.Kafka (>= 2.3)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.0.1)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple (5.2)
Confluent.Kafka (>= 2.3)
Microsoft.Extensions.Logging (>= 8.0)
Be.Vlaanderen.Basisregisters.Middleware.AddProblemJsonHeader (3.0)
Expand Down
29 changes: 6 additions & 23 deletions src/ParcelRegistry.Consumer.Address.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ namespace ParcelRegistry.Consumer.Address.Console
{
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Api.BackOffice.Abstractions;
Expand All @@ -13,6 +12,7 @@ namespace ParcelRegistry.Consumer.Address.Console
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.Extensions;
using Destructurama;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
Expand Down Expand Up @@ -103,7 +103,7 @@ public static async Task Main(string[] args)

builder.Register(c =>
{
var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"];
var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"]!;
var topic = $"{hostContext.Configuration["AddressTopic"]}" ?? throw new ArgumentException("Configuration has no AddressTopic.");
var suffix = hostContext.Configuration["GroupSuffix"];
var consumerGroupId = $"ParcelRegistry.BackOfficeConsumer.{topic}{suffix}";
Expand All @@ -115,28 +115,11 @@ public static async Task Main(string[] args)
EventsJsonSerializerSettingsProvider.CreateSerializerSettings());

consumerOptions.ConfigureSaslAuthentication(new SaslAuthentication(
hostContext.Configuration["Kafka:SaslUserName"],
hostContext.Configuration["Kafka:SaslPassword"]));
hostContext.Configuration["Kafka:SaslUserName"]!,
hostContext.Configuration["Kafka:SaslPassword"]!));

var offset = hostContext.Configuration["AddressTopicOffset"];

if (!string.IsNullOrWhiteSpace(offset) && long.TryParse(offset, out var result))
{
var ignoreDataCheck = hostContext.Configuration.GetValue<bool>("IgnoreAddressTopicOffsetDataCheck", false);

if (!ignoreDataCheck)
{
using var ctx = c.Resolve<ConsumerAddressContext>();

if (ctx.AddressConsumerItems.Any())
{
throw new InvalidOperationException(
$"Cannot set Kafka offset to {offset} because {nameof(ctx.AddressConsumerItems)} has data.");
}
}

consumerOptions.ConfigureOffset(new Offset(result));
}
using var ctx = c.Resolve<ConsumerAddressContext>();
ctx.OverrideConfigureOffset(consumerOptions);

return consumerOptions;
});
Expand Down
10 changes: 6 additions & 4 deletions src/ParcelRegistry.Consumer.Address/ConsumerAddressContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace ParcelRegistry.Consumer.Address
using System.IO;
using System.Linq;
using System.Reflection;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.SqlServer.MigrationExtensions;
using Microsoft.EntityFrameworkCore;
Expand All @@ -15,11 +16,12 @@ namespace ParcelRegistry.Consumer.Address
using Parcel.DataStructures;
using ParcelRegistry.Infrastructure;

public class ConsumerAddressContext : SqlServerConsumerDbContext<ConsumerAddressContext>, IAddresses
public class ConsumerAddressContext : SqlServerConsumerDbContext<ConsumerAddressContext>, IAddresses, IOffsetOverrideDbSet
{
public override string ProcessedMessagesSchema => Schema.ConsumerAddress;

public DbSet<AddressConsumerItem> AddressConsumerItems { get; set; }
public DbSet<AddressConsumerItem> AddressConsumerItems => Set<AddressConsumerItem>();
public DbSet<OffsetOverride> OffsetOverrides => Set<OffsetOverride>();

// This needs to be here to please EF
public ConsumerAddressContext()
Expand Down Expand Up @@ -88,8 +90,8 @@ protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);

modelBuilder
.ApplyConfigurationsFromAssembly(typeof(ConsumerAddressContext).GetTypeInfo().Assembly);
modelBuilder.ApplyConfigurationsFromAssembly(typeof(ConsumerAddressContext).GetTypeInfo().Assembly);
modelBuilder.ApplyConfiguration(new OffsetOverrideConfiguration(Schema.ConsumerAddress));
}
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace ParcelRegistry.Consumer.Address.Migrations
{
/// <inheritdoc />
public partial class AddOffsetOverrides : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "OffsetOverrides",
schema: "ParcelRegistryConsumerAddress",
columns: table => new
{
ConsumerGroupId = table.Column<string>(type: "nvarchar(450)", nullable: false),
Offset = table.Column<long>(type: "bigint", nullable: false),
Configured = table.Column<bool>(type: "bit", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_OffsetOverrides", x => x.ConsumerGroupId);
});

migrationBuilder.CreateIndex(
name: "IX_Addresses_Position",
schema: "ParcelRegistryConsumerAddress",
table: "Addresses",
column: "Position");
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "OffsetOverrides",
schema: "ParcelRegistryConsumerAddress");

migrationBuilder.DropIndex(
name: "IX_Addresses_Position",
schema: "ParcelRegistryConsumerAddress",
table: "Addresses");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,26 @@ protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "6.0.3")
.HasAnnotation("ProductVersion", "8.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 128);

SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder);

modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.OffsetOverride", b =>
{
b.Property<string>("ConsumerGroupId")
.HasColumnType("nvarchar(450)");

b.Property<bool>("Configured")
.HasColumnType("bit");

b.Property<long>("Offset")
.HasColumnType("bigint");

b.HasKey("ConsumerGroupId");

b.ToTable("OffsetOverrides", "ParcelRegistryConsumerAddress");
});

modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b =>
{
Expand Down Expand Up @@ -76,6 +92,8 @@ protected override void BuildModel(ModelBuilder modelBuilder)

b.HasIndex("IsRemoved");

b.HasIndex("Position");

b.ToTable("Addresses", "ParcelRegistryConsumerAddress");
});
#pragma warning restore 612, 618
Expand Down

0 comments on commit 050f236

Please sign in to comment.