Skip to content

Commit

Permalink
feat: stop host with unhandled exceptions and seperate healthcheck/or…
Browse files Browse the repository at this point in the history
…ganization commands from RoadNetworkCommandModule (#1576)
  • Loading branch information
rikdepeuter authored Nov 12, 2024
1 parent 8cf1887 commit 41aa484
Show file tree
Hide file tree
Showing 77 changed files with 605 additions and 394 deletions.
3 changes: 2 additions & 1 deletion src/RoadRegistry.BackOffice.Api/Infrastructure/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ public void ConfigureServices(IServiceCollection services)
sp.GetService<IClock>(),
sp.GetRequiredService<UseOvoCodeInChangeRoadNetworkFeatureToggle>(),
sp.GetService<IExtractUploadFailedEmailClient>(),
sp.GetService<IRoadNetworkEventWriter>(),
sp.GetService<ILoggerFactory>()
),
new RoadNetworkExtractCommandModule(
Expand Down Expand Up @@ -327,7 +326,9 @@ public void ConfigureServices(IServiceCollection services)
.AddTicketing()
.AddRoadRegistrySnapshot()
.AddSingleton(new ApplicationMetadata(RoadRegistryApplication.BackOffice))
.AddHealthCommandQueue()
.AddRoadNetworkCommandQueue()
.AddOrganizationCommandQueue()
.AddRoadNetworkSnapshotStrategyOptions()
.AddSingleton(apiOptions)
.Configure<ResponseOptions>(_configuration)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace RoadRegistry.BackOffice.Api.Infrastructure.SystemHealthCheck.HealthChecks;

using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
Expand All @@ -15,18 +14,18 @@ internal class CommandHostSystemHealthCheck : ISystemHealthCheck
{
private readonly ITicketing _ticketing;
private readonly RoadNetworkUploadsBlobClient _uploadsBlobClient;
private readonly IRoadNetworkCommandQueue _roadNetworkCommandQueue;
private readonly IHealthCommandQueue _healthCommandQueue;
private readonly SystemHealthCheckOptions _options;

public CommandHostSystemHealthCheck(
ITicketing ticketing,
RoadNetworkUploadsBlobClient uploadsBlobClient,
IRoadNetworkCommandQueue roadNetworkCommandQueue,
IHealthCommandQueue healthCommandQueue,
SystemHealthCheckOptions options)
{
_ticketing = ticketing;
_uploadsBlobClient = uploadsBlobClient;
_roadNetworkCommandQueue = roadNetworkCommandQueue;
_healthCommandQueue = healthCommandQueue;
_options = options;
}

Expand All @@ -46,7 +45,7 @@ public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context
AssemblyVersion = Assembly.GetExecutingAssembly().GetName().Version?.ToString(),
FileName = fileName
});
await _roadNetworkCommandQueue.WriteAsync(command, cancellationToken);
await _healthCommandQueue.WriteAsync(command, cancellationToken);

return await _ticketing.WaitUntilCompleteOrTimeout(ticketId, _options.CheckTimeout, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public async Task<IActionResult> Change(
};
await validator.ValidateAndThrowAsync(command, cancellationToken);

await RoadNetworkCommandQueue
await OrganizationCommandQueue
.WriteAsync(new Command(command), HttpContext.RequestAborted);

return Accepted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public async Task<IActionResult> Create(
};
await validator.ValidateAndThrowAsync(command, cancellationToken);

await RoadNetworkCommandQueue
await OrganizationCommandQueue
.WriteAsync(new Command(command), HttpContext.RequestAborted);

return Accepted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public async Task<IActionResult> Delete(
};
await validator.ValidateAndThrowAsync(command, cancellationToken);

await RoadNetworkCommandQueue
await OrganizationCommandQueue
.WriteAsync(new Command(command), HttpContext.RequestAborted);

return Accepted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public async Task<IActionResult> Rename(
};
await validator.ValidateAndThrowAsync(command, cancellationToken);

await RoadNetworkCommandQueue
await OrganizationCommandQueue
.WriteAsync(new Command(command), HttpContext.RequestAborted);

return Accepted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ public partial class OrganizationsController : BackofficeApiController
public OrganizationsController(
IStreamStore store,
IMediator mediator,
IRoadNetworkCommandQueue roadNetworkCommandQueue)
IOrganizationCommandQueue organizationCommandQueue)
{
Store = store;
Mediator = mediator;
RoadNetworkCommandQueue = roadNetworkCommandQueue;
OrganizationCommandQueue = organizationCommandQueue;
}

protected IStreamStore Store { get; }
protected IMediator Mediator { get; }
protected IRoadNetworkCommandQueue RoadNetworkCommandQueue { get; }
protected IOrganizationCommandQueue OrganizationCommandQueue { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public CommandHostHealthModule(

try
{
await context.RoadNetworks.Get(cancellationToken);
await snapshotReader.ReadSnapshotVersionAsync(cancellationToken);

var blobClient = container.Resolve<RoadNetworkUploadsBlobClient>();
await blobClient.GetBlobAsync(new BlobName(command.Body.FileName), cancellationToken);
Expand Down
67 changes: 27 additions & 40 deletions src/RoadRegistry.BackOffice.CommandHost/CommandProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
namespace RoadRegistry.BackOffice.CommandHost;

using System;
using System.IO;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Abstractions;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Framework;
using Hosts;
using Messages;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using SqlStreamStore;
using SqlStreamStore.Streams;
using SqlStreamStore.Subscriptions;
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

public class CommandProcessor : RoadRegistryHostedService
{
private static readonly EventMapping CommandMapping = new EventMapping(RoadNetworkCommands.All.ToDictionary(command => command.Name));

private static readonly TimeSpan ResubscribeAfter = TimeSpan.FromSeconds(5);

private static readonly JsonSerializerSettings SerializerSettings =
Expand All @@ -32,21 +29,24 @@ public class CommandProcessor : RoadRegistryHostedService
private readonly CancellationTokenSource _messagePumpCancellation;
private readonly Scheduler _scheduler;
private readonly RoadRegistryApplication _applicationProcessor;
private DistributedStreamStoreLock _distributedStreamStoreLock;
private readonly DistributedStreamStoreLock _distributedStreamStoreLock;

public CommandProcessor(
IHostApplicationLifetime hostApplicationLifetime,
IStreamStore streamStore,
StreamName queue,
ICommandProcessorPositionStore positionStore,
EventMapping commandMapping,
CommandHandlerDispatcher dispatcher,
Scheduler scheduler,
RoadRegistryApplication applicationProcessor,
DistributedStreamStoreLockOptions distributedStreamStoreLockOptions,
ILogger<CommandProcessor> logger)
: base(logger)
ILoggerFactory loggerFactory)
: base(loggerFactory)
{
ArgumentNullException.ThrowIfNull(streamStore);
ArgumentNullException.ThrowIfNull(positionStore);
ArgumentNullException.ThrowIfNull(commandMapping);
ArgumentNullException.ThrowIfNull(dispatcher);

_scheduler = scheduler.ThrowIfNull();
Expand All @@ -62,8 +62,6 @@ public CommandProcessor(
});
_messagePump = Task.Factory.StartNew(async () =>
{
//TODO-rik bij failure toch blijven proberen zodat commandhost niet herstart moet worden

IStreamSubscription subscription = null;
try
{
Expand All @@ -74,12 +72,12 @@ public CommandProcessor(
switch (message)
{
case Subscribe:
logger.LogInformation("Subscribing ...");
Logger.LogInformation("Subscribing ...");
subscription?.Dispose();
var version = await positionStore
.ReadVersion(queue.ToString(), _messagePumpCancellation.Token)
.ConfigureAwait(false);
logger.LogInformation("Subscribing as of {0}", version ?? -1);
Logger.LogInformation("Subscribing as of {0}", version ?? -1);
subscription = streamStore.SubscribeToStream(
queue.ToString(),
version,
Expand Down Expand Up @@ -110,7 +108,7 @@ await _messageChannel.Writer
case ProcessStreamMessage process:
try
{
logger.LogInformation(
Logger.LogInformation(
"Processing {MessageType} at {Position}",
process.Message.Type, process.Message.Position);

Expand All @@ -123,15 +121,15 @@ await _distributedStreamStoreLock.RetryRunUntilLockAcquiredAsync(async () =>
await process.Message
.GetJsonData(_messagePumpCancellation.Token)
.ConfigureAwait(false),
CommandMapping.GetEventType(process.Message.Type),
commandMapping.GetEventType(process.Message.Type),
SerializerSettings);
var command = new Command(body).WithMessageId(process.Message.MessageId);
await dispatcher(command, _messagePumpCancellation.Token).ConfigureAwait(false);
}, _messagePumpCancellation.Token);
}
else
{
logger.LogInformation("Skipping {MessageType} at {Position} - Message Processor '{MessageProcessor}' does not match '{Processor}'", process.Message.Type, process.Message.Position, messageProcessor, _applicationProcessor);
Logger.LogInformation("Skipping {MessageType} at {Position} - Message Processor '{MessageProcessor}' does not match '{Processor}'", process.Message.Type, process.Message.Position, messageProcessor, _applicationProcessor);
}

await positionStore
Expand All @@ -141,30 +139,26 @@ await positionStore
.ConfigureAwait(false);
process.Complete();

logger.LogInformation(
Logger.LogInformation(
"Processed {MessageType} at {Position}",
process.Message.Type, process.Message.Position);
}
catch (Exception exception)
{
logger.LogError(exception, exception.Message);
Logger.LogError(exception, "Error while processing {MessageType} at {Position}",
process.Message.Type, process.Message.Position);

// how are we going to recover from this? do we even need to recover from this?
// prediction: it's going to be a serialization error, a data quality error, or a bug
// if (process.Message.StreamVersion == 0)
// {
// await positionStore.WriteVersion(RoadNetworkCommandQueue,
// process.Message.StreamVersion,
// _messagePumpCancellation.Token);
// }
process.Fault(exception);
throw;
}

break;
case SubscriptionDropped dropped:
if (dropped.Reason == SubscriptionDroppedReason.StreamStoreError)
{
logger.LogError(dropped.Exception,
Logger.LogError(dropped.Exception,
"Subscription was dropped because of a stream store error.");
await scheduler.Schedule(async token =>
{
Expand All @@ -176,7 +170,7 @@ await scheduler.Schedule(async token =>
}
else if (dropped.Reason == SubscriptionDroppedReason.SubscriberError)
{
logger.LogError(dropped.Exception,
Logger.LogError(dropped.Exception,
"Subscription was dropped because of a subscriber error.");

if (CanResumeFrom(dropped))
Expand All @@ -198,23 +192,16 @@ await scheduler.Schedule(async token =>
}
catch (TaskCanceledException)
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.Log(LogLevel.Information, "CommandProcessor message pump is exiting due to cancellation.");
}
Logger.LogInformation("CommandProcessor message pump is exiting due to task cancellation.");
}
catch (OperationCanceledException)
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.Log(LogLevel.Information, "CommandProcessor message pump is exiting due to cancellation.");
}
Logger.LogInformation("CommandProcessor message pump is exiting due to operation cancellation.");
}
catch (Exception exception)
{
logger.LogError(exception, "CommandProcessor message pump is exiting due to a bug.");
await StopAsync(_messagePumpCancellation.Token);
throw;
Logger.LogError(exception, "CommandProcessor message pump is exiting due to a bug.");
hostApplicationLifetime.StopApplication();
}
finally
{
Expand Down
32 changes: 32 additions & 0 deletions src/RoadRegistry.BackOffice.CommandHost/HealthCommandProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace RoadRegistry.BackOffice.CommandHost;

using Abstractions;
using Framework;
using Hosts;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SqlStreamStore;

public class HealthCommandProcessor : CommandProcessor
{
public HealthCommandProcessor(
IHostApplicationLifetime hostApplicationLifetime,
IStreamStore streamStore,
ICommandProcessorPositionStore positionStore,
CommandHandlerDispatcher dispatcher,
Scheduler scheduler,
DistributedStreamStoreLockOptions distributedStreamStoreLockOptions,
ILoggerFactory loggerFactory)
: base(hostApplicationLifetime,
streamStore,
HealthCommandQueue.Stream,
positionStore,
HealthCommandQueue.CommandMapping,
dispatcher,
scheduler,
RoadRegistryApplication.BackOffice,
distributedStreamStoreLockOptions,
loggerFactory)
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace RoadRegistry.BackOffice.CommandHost;

using Abstractions;
using Framework;
using Hosts;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using SqlStreamStore;

public class OrganizationCommandProcessor : CommandProcessor
{
public OrganizationCommandProcessor(
IHostApplicationLifetime hostApplicationLifetime,
IStreamStore streamStore,
ICommandProcessorPositionStore positionStore,
CommandHandlerDispatcher dispatcher,
Scheduler scheduler,
DistributedStreamStoreLockOptions distributedStreamStoreLockOptions,
ILoggerFactory loggerFactory)
: base(hostApplicationLifetime,
streamStore,
OrganizationCommandQueue.Stream,
positionStore,
OrganizationCommandQueue.CommandMapping,
dispatcher,
scheduler,
RoadRegistryApplication.BackOffice,
distributedStreamStoreLockOptions,
loggerFactory)
{
}
}
Loading

0 comments on commit 41aa484

Please sign in to comment.