Skip to content

Commit

Permalink
feat: add postal read consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
pgallik committed Oct 9, 2023
1 parent a3fc1bb commit a4df93c
Show file tree
Hide file tree
Showing 24 changed files with 1,049 additions and 1 deletion.
7 changes: 7 additions & 0 deletions StreetNameRegistry.sln
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StreetNameRegistry.Api.Back
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreetNameRegistry.Snapshot.Verifier", "src\StreetNameRegistry.Snapshot.Verifier\StreetNameRegistry.Snapshot.Verifier.csproj", "{06F8F677-2696-4996-B4AE-B8E3B747A696}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreetNameRegistry.Consumer.Read.Postal", "src\StreetNameRegistry.Consumer.Read.Postal\StreetNameRegistry.Consumer.Read.Postal.csproj", "{83C7D129-ACEA-4FC6-A55C-748D8D31135A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -185,6 +187,10 @@ Global
{06F8F677-2696-4996-B4AE-B8E3B747A696}.Debug|Any CPU.Build.0 = Debug|Any CPU
{06F8F677-2696-4996-B4AE-B8E3B747A696}.Release|Any CPU.ActiveCfg = Release|Any CPU
{06F8F677-2696-4996-B4AE-B8E3B747A696}.Release|Any CPU.Build.0 = Release|Any CPU
{83C7D129-ACEA-4FC6-A55C-748D8D31135A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{83C7D129-ACEA-4FC6-A55C-748D8D31135A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{83C7D129-ACEA-4FC6-A55C-748D8D31135A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{83C7D129-ACEA-4FC6-A55C-748D8D31135A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -215,6 +221,7 @@ Global
{C1C797C7-4A24-4E9C-B7D0-7E097562342E} = {65EA04DE-5928-430F-92CA-24C11B6E5A00}
{EFDF165C-DCFE-4577-A00F-04691197FB1B} = {81641B0B-BEFB-476D-8519-3774313E944B}
{06F8F677-2696-4996-B4AE-B8E3B747A696} = {65EA04DE-5928-430F-92CA-24C11B6E5A00}
{83C7D129-ACEA-4FC6-A55C-748D8D31135A} = {65EA04DE-5928-430F-92CA-24C11B6E5A00}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3B44FE80-30FE-47B7-A0DE-8610218FFF63}
Expand Down
1 change: 1 addition & 0 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 19.0.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 19.0.0

nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 4.8.3
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 4.8.3
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer 4.8.3
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer 4.8.3
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 4.8.3
Expand Down
5 changes: 5 additions & 0 deletions paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ NUGET
AWSSDK.Core (>= 3.7.106.8) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
AWSSDK.SQS (>= 3.7.100.101) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Microsoft.Extensions.Logging (>= 6.0) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer (4.8.3)
Confluent.Kafka (>= 1.8.2) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Microsoft.EntityFrameworkCore (>= 6.0.3) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Microsoft.Extensions.Logging (>= 6.0) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Newtonsoft.Json (>= 13.0.1) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.SqlServer (4.8.3)
Confluent.Kafka (>= 1.8.2) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Microsoft.EntityFrameworkCore.SqlServer (>= 6.0.3) - restriction: || (&& (== net472) (>= net6.0)) (== net6.0)
Expand Down
1 change: 1 addition & 0 deletions src/EF.MigrationsHelper/EF.MigrationsHelper.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<ProjectReference Include="..\StreetNameRegistry.Api.BackOffice\StreetNameRegistry.Api.BackOffice.csproj" />
<ProjectReference Include="..\StreetNameRegistry.Consumer\StreetNameRegistry.Consumer.csproj" />
<ProjectReference Include="..\StreetNameRegistry.Producer\StreetNameRegistry.Producer.csproj" />
<ProjectReference Include="..\StreetNameRegistry.Consumer.Read.Postal\StreetNameRegistry.Consumer.Read.Postal.csproj" />
<ProjectReference Include="..\StreetNameRegistry.Producer.Snapshot.Oslo\StreetNameRegistry.Producer.Snapshot.Oslo.csproj" />
<ProjectReference Include="..\StreetNameRegistry.Projections.BackOffice\StreetNameRegistry.Projections.BackOffice.csproj" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/EF.MigrationsHelper/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"LastChangedListAdmin": "x",
"BackOffice": "x",
"ProducerProjectionsAdmin": "x",
"BackOfficeProjectionsAdmin": "x"
"ConsumerPostalAdmin": "x"
},

"Serilog": {
Expand Down
60 changes: 60 additions & 0 deletions src/StreetNameRegistry.Consumer.Read.Postal/ConsumerPostal.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace StreetNameRegistry.Consumer.Read.Postal
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Projections;

public class ConsumerPostal : BackgroundService
{
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly IDbContextFactory<ConsumerPostalContext> _consumerPostalDbContextFactory;
private readonly IConsumer _consumer;
private readonly ILogger<ConsumerPostal> _logger;

public ConsumerPostal(
IHostApplicationLifetime hostApplicationLifetime,
IDbContextFactory<ConsumerPostalContext> consumerPostalDbContextFactory,
IConsumer consumer,
ILoggerFactory loggerFactory)
{
_hostApplicationLifetime = hostApplicationLifetime;
_consumerPostalDbContextFactory = consumerPostalDbContextFactory;
_consumer = consumer;
_logger = loggerFactory.CreateLogger<ConsumerPostal>();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var projector =
new ConnectedProjector<ConsumerPostalContext>(
Resolve.WhenEqualToHandlerMessageType(new PostalKafkaProjection().Handlers));

try
{
await _consumer.ConsumeContinuously(async message =>
{
_logger.LogInformation("Handling next message");

await using var context = await _consumerPostalDbContextFactory.CreateDbContextAsync(stoppingToken);
await projector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false);

//CancellationToken.None to prevent halfway consumption
await context.SaveChangesAsync(CancellationToken.None);

}, stoppingToken);
}
catch (Exception exception)
{
_logger.LogCritical(exception, $"Critical error occured in {nameof(ConsumerPostal)}.");
_hostApplicationLifetime.StopApplication();
throw;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
namespace StreetNameRegistry.Consumer.Read.Postal
{
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Runner.MigrationExtensions;
using StreetNameRegistry.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Design;
using Microsoft.Extensions.Configuration;

public class ConsumerPostalContext : RunnerDbContext<ConsumerPostalContext>
{
public DbSet<PostalConsumerItem> PostalConsumerItems { get; set; }

// This needs to be here to please EF
public ConsumerPostalContext()
{ }

// This needs to be DbContextOptions<T> for Autofac!
public ConsumerPostalContext(DbContextOptions<ConsumerPostalContext> options)
: base(options)
{ }

public override string ProjectionStateSchema => Schema.ConsumerReadPostal;
}

public class ConsumerContextFactory : IDesignTimeDbContextFactory<ConsumerPostalContext>
{
public ConsumerPostalContext CreateDbContext(string[] args)
{
const string migrationConnectionStringName = "ConsumerPostalAdmin";

var configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: false)
.AddJsonFile($"appsettings.{Environment.MachineName.ToLowerInvariant()}.json", optional: true, reloadOnChange: false)
.AddEnvironmentVariables()
.Build();

var builder = new DbContextOptionsBuilder<ConsumerPostalContext>();

var connectionString = configuration.GetConnectionString(migrationConnectionStringName);
if (string.IsNullOrEmpty(connectionString))
throw new InvalidOperationException($"Could not find a connection string with name '{migrationConnectionStringName}'");

builder
.UseSqlServer(connectionString, sqlServerOptions =>
{
sqlServerOptions.EnableRetryOnFailure();
sqlServerOptions.MigrationsHistoryTable(MigrationTables.ConsumerReadPostal, Schema.ConsumerReadPostal);
sqlServerOptions.UseNetTopologySuite();
})
.UseExtendedSqlServerMigrations();

return new ConsumerPostalContext(builder.Options);
}
}
}
23 changes: 23 additions & 0 deletions src/StreetNameRegistry.Consumer.Read.Postal/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM mcr.microsoft.com/dotnet/runtime-deps:6.0.3

# create group & user
RUN addgroup --gid 1000 --system app && adduser --uid 1000 -system app --gid 1000

# create work dir and set permissions as WORKDIR sets permissions as root
RUN mkdir /app && chown -R app:app /app
WORKDIR /app

LABEL maintainer "Digitaal Vlaanderen <[email protected]>"
LABEL registry="building-registry"

COPY / /app
WORKDIR /app

RUN apt-get update && \
apt-get install curl jq -y && \
chmod +x ./init.sh

# switch to created user
USER app

ENTRYPOINT ["./init.sh"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
namespace StreetNameRegistry.Consumer.Read.Postal.Infrastructure
{
using System;
using System.Threading;
using System.Threading.Tasks;
using StreetNameRegistry.Infrastructure;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Polly;

public class MigrationsLogger { }

public static class MigrationsHelper
{
public static Task RunAsync(
string connectionString,
ILoggerFactory loggerFactory,
CancellationToken cancellationToken = default)
{
var logger = loggerFactory.CreateLogger<MigrationsLogger>();

return Policy
.Handle<SqlException>()
.WaitAndRetryAsync(
5,
retryAttempt =>
{
var value = Math.Pow(2, retryAttempt) / 4;
var randomValue = new Random().Next((int)value * 3, (int)value * 5);
logger?.LogInformation("Retrying after {Seconds} seconds...", randomValue);
return TimeSpan.FromSeconds(randomValue);
})
.ExecuteAsync(async ct =>
{
logger?.LogInformation("Running EF Migrations.");
await RunInternal(connectionString, loggerFactory, ct);
},
cancellationToken);
}

private static async Task RunInternal(string connectionString, ILoggerFactory loggerFactory, CancellationToken cancellationToken)
{
var migratorOptions = new DbContextOptionsBuilder<ConsumerPostalContext>()
.UseSqlServer(
connectionString,
sqlServerOptions =>
{
sqlServerOptions.EnableRetryOnFailure();
sqlServerOptions.MigrationsHistoryTable(MigrationTables.ConsumerReadPostal, Schema.ConsumerReadPostal);
sqlServerOptions.UseNetTopologySuite();
});

migratorOptions = migratorOptions.UseLoggerFactory(loggerFactory);

await using var migrator = new ConsumerPostalContext(migratorOptions.Options);
await migrator.Database.MigrateAsync(cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace StreetNameRegistry.Consumer.Read.Postal.Infrastructure.Modules
{
using System;
using Autofac;
using Be.Vlaanderen.Basisregisters.DataDog.Tracing.Sql.EntityFrameworkCore;
using StreetNameRegistry.Infrastructure;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

public class ConsumerPostalModule : Module
{
public ConsumerPostalModule(
IConfiguration configuration,
IServiceCollection services,
ILoggerFactory loggerFactory,
ServiceLifetime serviceLifetime = ServiceLifetime.Scoped)
{
var logger = loggerFactory.CreateLogger<ConsumerPostalModule>();
var connectionString = configuration.GetConnectionString("ConsumerPostal");

var hasConnectionString = !string.IsNullOrWhiteSpace(connectionString);
if (hasConnectionString)
{
RunOnSqlServer(configuration, services, serviceLifetime, loggerFactory, connectionString);
}
else
{
RunInMemoryDb(services, loggerFactory, logger);
}
}

private static void RunOnSqlServer(
IConfiguration configuration,
IServiceCollection services,
ServiceLifetime serviceLifetime,
ILoggerFactory loggerFactory,
string consumerProjectionsConnectionString)
{
services
.AddScoped(s => new TraceDbConnection<ConsumerPostalContext>(
new SqlConnection(consumerProjectionsConnectionString),
configuration["DataDog:ServiceName"]))
.AddDbContext<ConsumerPostalContext>((provider, options) => options
.UseLoggerFactory(loggerFactory)
.UseSqlServer(provider.GetRequiredService<TraceDbConnection<ConsumerPostalContext>>(), sqlServerOptions =>
{
sqlServerOptions.EnableRetryOnFailure();
sqlServerOptions.MigrationsHistoryTable(MigrationTables.ConsumerReadPostal, Schema.ConsumerReadPostal);
sqlServerOptions.UseNetTopologySuite();
}), serviceLifetime);
}

private static void RunInMemoryDb(
IServiceCollection services,
ILoggerFactory loggerFactory,
ILogger logger)
{
services
.AddDbContext<ConsumerPostalContext>(options => options
.UseLoggerFactory(loggerFactory)
.UseInMemoryDatabase(Guid.NewGuid().ToString(), sqlServerOptions => { }));

logger.LogWarning("Running InMemory for {Context}!", nameof(ConsumerPostalContext));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
namespace StreetNameRegistry.Consumer.Read.Postal.Infrastructure.Modules
{
using System;
using Autofac;
using Destructurama;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Serilog;
using Serilog.Debugging;

public class LoggingModule : Module
{
public LoggingModule(
IConfiguration configuration,
IServiceCollection services)
{
SelfLog.Enable(Console.WriteLine);

Log.Logger = new LoggerConfiguration()
.ReadFrom.Configuration(configuration)
.Enrich.FromLogContext()
.Enrich.WithMachineName()
.Enrich.WithThreadId()
.Enrich.WithEnvironmentUserName()
.Destructure.JsonNetTypes()
.CreateLogger();

services.AddLogging(l =>
{
l.ClearProviders();
l.AddSerilog(Log.Logger);
});
}
}
}
Loading

0 comments on commit a4df93c

Please sign in to comment.