Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host] Refactor 'Retry' delegate in IConsumerErrorHandler as response type #354

Merged
merged 1 commit into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,28 +1059,41 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag

## Error Handling

Message processing by consumers or handlers may result in exceptions.
Starting with version 2.3.0, SMB introduces a standard way to integrate custom error handling logic across different transports.

The interface [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types:
Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.

```cs
public interface IConsumerErrorHandler<in T>
{
/// <summary>
/// Executed when the message consumer (or handler) errors out. This interface allows to intercept and handle the exception.
/// Use the consumer context to get ahold of transport specific options to proceed (acknowledge/reject message).
/// <para>
/// Executed when the message consumer (or handler) errors out. The interface allows for interception of
/// exceptions to manipulate the processing pipeline (success/fail/retry).
/// </para>
/// <para>
/// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc).
/// </para>
/// <para>
/// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns.
/// </para>
/// </summary>
/// <param name="message">The message that failed to process.</param>
/// <param name="retry">Performs another message processing try. The return value is relevant if the consumer was a request handler (it will be its response value). Ensure to pass the return value to the result of the error handler.</param>
/// <param name="consumerContext">The consumer context for the message processing pipeline.</param>
/// <param name="exception">Exception that occurred during message processing.</param>
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
/// <returns>The error handling result.</returns>
Task<ConsumerErrorHandlerResult> OnHandleError(T message, Func<Task<object>> retry, IConsumerContext consumerContext, Exception exception);
Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}
```

> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired.
The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
| Result | Description |
|---------|-------------|
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method)[^1] |

[^1]: `Retry` will recreate the message scope on every atttempt if `PerMessageScopeEnabled` has been enabled.

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

Expand All @@ -1106,6 +1119,26 @@ Transport plugins provide specialized error handling interfaces. Examples includ

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.

Sample retry with exponential back-off:
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
await Task.Delay(delay, consumerContext.CancellationToken);
return Retry();
}

return Failure();
}
}
```

## Logging

SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions):
Expand Down
35 changes: 30 additions & 5 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,14 +1059,19 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag

## Error Handling

Message processing by consumers or handlers may result in exceptions.
Starting with version 2.3.0, SMB introduces a standard way to integrate custom error handling logic across different transports.

The interface [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types:
Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.

@[:cs](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs,Interface)

> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired.
The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
| Result | Description |
|---------|-------------|
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method)[^1] |

[^1]: `Retry` will recreate the message scope on every atttempt if `PerMessageScopeEnabled` has been enabled.

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

zarusz marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -1092,6 +1097,26 @@ Transport plugins provide specialized error handling interfaces. Examples includ

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.

Sample retry with exponential back-off:
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
await Task.Delay(delay, consumerContext.CancellationToken);
return Retry();
}

return Failure();
}
}
```

## Logging

SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AmazonSQS;

public interface ISqsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface ISqsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class SqsConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

public interface IEventHubConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IEventHubConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class EventHubConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
12 changes: 6 additions & 6 deletions src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
global using Microsoft.Extensions.Logging;
global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
global using Azure.Storage.Blobs;

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
global using Azure.Storage.Blobs;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
10 changes: 5 additions & 5 deletions src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
global using Microsoft.Extensions.Logging;
global using Azure.Messaging.ServiceBus;
global using Azure.Messaging.ServiceBus.Administration;

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.ServiceBus;
global using Azure.Messaging.ServiceBus.Administration;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

public interface IKafkaConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IKafkaConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class KafkaConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Kafka/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Memory;

public interface IMemoryConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IMemoryConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class MemoryConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Memory/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
global using MQTTnet.Client;
global using MQTTnet.Extensions.ManagedClient;

global using SlimMessageBus.Host.Services;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Mqtt;

public interface IMqttConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IMqttConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class MqttConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
5 changes: 3 additions & 2 deletions src/SlimMessageBus.Host.Nats/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Services;
global using NATS.Client.Core;

global using NATS.Client.Core;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Nats;

public interface INatsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface INatsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class NatsConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

public interface IRabbitMqConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IRabbitMqConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class RabbitMqConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.RabbitMQ/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
global using RabbitMQ.Client;
global using RabbitMQ.Client.Events;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Redis;

public interface IRedisConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IRedisConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class RedisConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Redis/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;

global using StackExchange.Redis;
global using StackExchange.Redis;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host/Collections/RuntimeTypeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class RuntimeTypeCache : IRuntimeTypeCache
public IGenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Task<object>>> ConsumerInterceptorType { get; }
public IGenericTypeCache2<Func<object, object, object, IConsumerContext, Task>> HandlerInterceptorType { get; }

public IGenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Exception, Task<ConsumerErrorHandlerResult>>> ConsumerErrorHandlerType { get; }
public IGenericTypeCache<Func<object, object, IConsumerContext, Exception, int, Task<ConsumerErrorHandlerResult>>> ConsumerErrorHandlerType { get; }

public RuntimeTypeCache()
{
Expand Down Expand Up @@ -78,7 +78,7 @@ public RuntimeTypeCache()
typeof(IRequestHandlerInterceptor<,>),
nameof(IRequestHandlerInterceptor<object, object>.OnHandle));

ConsumerErrorHandlerType = new GenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Exception, Task<ConsumerErrorHandlerResult>>>(
ConsumerErrorHandlerType = new GenericTypeCache<Func<object, object, IConsumerContext, Exception, int, Task<ConsumerErrorHandlerResult>>>(
typeof(IConsumerErrorHandler<>),
nameof(IConsumerErrorHandler<object>.OnHandleError));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace SlimMessageBus.Host.Consumer.ErrorHandling;

public abstract class ConsumerErrorHandler<T> : BaseConsumerErrorHandler, IConsumerErrorHandler<T>
{
public abstract Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}

public abstract class BaseConsumerErrorHandler
{
public static ConsumerErrorHandlerResult Failure() => ConsumerErrorHandlerResult.Failure;
public static ConsumerErrorHandlerResult Retry() => ConsumerErrorHandlerResult.Retry;
public static ConsumerErrorHandlerResult Success(object response = null) => response == null ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.SuccessWithResponse(response);
}
Loading