Skip to content

Commit

Permalink
zarusz#347 Refactor IConsumerErrorHandler to return a 'retry' respons…
Browse files Browse the repository at this point in the history
…e instead of supplying a 'retry' delegate
  • Loading branch information
EtherZa committed Dec 29, 2024
1 parent 78afac0 commit 28091d8
Show file tree
Hide file tree
Showing 31 changed files with 710 additions and 590 deletions.
29 changes: 20 additions & 9 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,28 +1059,39 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag

## Error Handling

Message processing by consumers or handlers may result in exceptions.
Starting with version 2.3.0, SMB introduces a standard way to integrate custom error handling logic across different transports.

The interface [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types:
Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.

```cs
public interface IConsumerErrorHandler<in T>
{
/// <summary>
/// Executed when the message consumer (or handler) errors out. This interface allows to intercept and handle the exception.
/// Use the consumer context to get ahold of transport specific options to proceed (acknowledge/reject message).
/// <para>
/// Executed when the message consumer (or handler) errors out. The interface allows for interception of
/// exceptions to manipulate the processing pipeline (success/fail/retry).
/// </para>
/// <para>
/// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc).
/// </para>
/// <para>
/// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns.
/// </para>
/// </summary>
/// <param name="message">The message that failed to process.</param>
/// <param name="retry">Performs another message processing try. The return value is relevant if the consumer was a request handler (it will be its response value). Ensure to pass the return value to the result of the error handler.</param>
/// <param name="consumerContext">The consumer context for the message processing pipeline.</param>
/// <param name="exception">Exception that occurred during message processing.</param>
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
/// <returns>The error handling result.</returns>
Task<ConsumerErrorHandlerResult> OnHandleError(T message, Func<Task<object>> retry, IConsumerContext consumerContext, Exception exception);
Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}
```

> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired.
The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
| Result | Description |
|---------|-------------|
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method) |

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

Expand Down
13 changes: 8 additions & 5 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -1059,14 +1059,17 @@ public class LoggingConsumerInterceptor<TMessage> : IConsumerInterceptor<TMessag

## Error Handling

Message processing by consumers or handlers may result in exceptions.
Starting with version 2.3.0, SMB introduces a standard way to integrate custom error handling logic across different transports.

The interface [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) enables the definition of custom error handling for specific message types:
Message processing by consumers or handlers may result in exceptions. The [IConsumerErrorHandler<T>](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs) provides a standard way to integrate custom error handling logic across different transports.

@[:cs](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs,Interface)

> The `retry()` parameter allows the message processing pipeline, including consumer interceptors, to retry processing when transient errors occur and retries are desired.
The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
| Result | Description |
|---------|-------------|
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the messagage as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Execute the pipeline again (any delay/jitter should be applied before returning from method) |

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AmazonSQS;

public interface ISqsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface ISqsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class SqsConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.AmazonSQS/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
global using Microsoft.Extensions.DependencyInjection.Extensions;
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

public interface IEventHubConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IEventHubConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class EventHubConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
12 changes: 6 additions & 6 deletions src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
global using Microsoft.Extensions.Logging;
global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
global using Azure.Storage.Blobs;

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
global using Azure.Storage.Blobs;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
10 changes: 5 additions & 5 deletions src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
global using Microsoft.Extensions.Logging;
global using Azure.Messaging.ServiceBus;
global using Azure.Messaging.ServiceBus.Administration;

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.ServiceBus;
global using Azure.Messaging.ServiceBus.Administration;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

public interface IKafkaConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IKafkaConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class KafkaConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.Kafka/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Memory;

public interface IMemoryConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IMemoryConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class MemoryConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Memory/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
global using MQTTnet.Client;
global using MQTTnet.Extensions.ManagedClient;

global using SlimMessageBus.Host.Services;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Mqtt;

public interface IMqttConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IMqttConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class MqttConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
5 changes: 3 additions & 2 deletions src/SlimMessageBus.Host.Nats/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Services;
global using NATS.Client.Core;

global using NATS.Client.Core;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Nats;

public interface INatsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface INatsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class NatsConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

public interface IRabbitMqConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IRabbitMqConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class RabbitMqConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.RabbitMQ/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
global using RabbitMQ.Client;
global using RabbitMQ.Client.Events;

global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Services;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
namespace SlimMessageBus.Host.Redis;

public interface IRedisConsumerErrorHandler<in T> : IConsumerErrorHandler<T>
{
}
public interface IRedisConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class RedisConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
3 changes: 2 additions & 1 deletion src/SlimMessageBus.Host.Redis/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Consumer.ErrorHandling;
global using SlimMessageBus.Host.Serialization;
global using SlimMessageBus.Host.Services;

global using StackExchange.Redis;
global using StackExchange.Redis;
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host/Collections/RuntimeTypeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class RuntimeTypeCache : IRuntimeTypeCache
public IGenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Task<object>>> ConsumerInterceptorType { get; }
public IGenericTypeCache2<Func<object, object, object, IConsumerContext, Task>> HandlerInterceptorType { get; }

public IGenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Exception, Task<ConsumerErrorHandlerResult>>> ConsumerErrorHandlerType { get; }
public IGenericTypeCache<Func<object, object, IConsumerContext, Exception, int, Task<ConsumerErrorHandlerResult>>> ConsumerErrorHandlerType { get; }

public RuntimeTypeCache()
{
Expand Down Expand Up @@ -78,7 +78,7 @@ public RuntimeTypeCache()
typeof(IRequestHandlerInterceptor<,>),
nameof(IRequestHandlerInterceptor<object, object>.OnHandle));

ConsumerErrorHandlerType = new GenericTypeCache<Func<object, object, Func<Task<object>>, IConsumerContext, Exception, Task<ConsumerErrorHandlerResult>>>(
ConsumerErrorHandlerType = new GenericTypeCache<Func<object, object, IConsumerContext, Exception, int, Task<ConsumerErrorHandlerResult>>>(
typeof(IConsumerErrorHandler<>),
nameof(IConsumerErrorHandler<object>.OnHandleError));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace SlimMessageBus.Host.Consumer.ErrorHandling;

public abstract class ConsumerErrorHandler<T> : BaseConsumerErrorHandler, IConsumerErrorHandler<T>
{
public abstract Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}

public abstract class BaseConsumerErrorHandler
{
public static ConsumerErrorHandlerResult Failure() => ConsumerErrorHandlerResult.Failure;
public static ConsumerErrorHandlerResult Retry() => ConsumerErrorHandlerResult.Retry;
public static ConsumerErrorHandlerResult Success(object response = null) => response == null ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.SuccessWithResponse(response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,42 @@

public record ConsumerErrorHandlerResult
{
private static readonly object NoResponse = new();
private static readonly object _noResponse = new();

public bool Handled { get; private set; }
private ConsumerErrorHandlerResult(ConsumerErrorHandlerResultEnum result, object response = null)
{
Result = result;
Response = response ?? _noResponse;
}

public ConsumerErrorHandlerResultEnum Result { get; private set; }
public object Response { get; private set; }
public bool HasResponse => !ReferenceEquals(Response, NoResponse);
public bool HasResponse => !ReferenceEquals(Response, _noResponse);

/// <summary>
/// The error handler was not able to handle the exception.
/// The message should be placed back into the queue.
/// </summary>
public static readonly ConsumerErrorHandlerResult Failure = new() { Handled = false, Response = NoResponse };
public static readonly ConsumerErrorHandlerResult Failure = new(ConsumerErrorHandlerResultEnum.Fail);

/// <summary>
/// The error handler was able to handle the exception.
/// The message processor should evaluate the message as having been processed successfully.
/// </summary>
public static readonly ConsumerErrorHandlerResult Success = new() { Handled = true, Response = NoResponse };
public static readonly ConsumerErrorHandlerResult Success = new(ConsumerErrorHandlerResultEnum.Success);

/// <summary>
/// The error handler was able to handle the exception, and has a fallback response for the <see cref="IRequestHandler{TRequest}"/> or <see cref="IRequestHandler{TRequest, TResponse}"/>.
/// The message processor should evaluate the message as having been processed successfully and use the specified fallback response for the <see cref="IRequestHandler{TRequest}"/> or <see cref="IRequestHandler{TRequest, TResponse}"/>.
/// </summary>
public static ConsumerErrorHandlerResult SuccessWithResponse(object response) => new() { Handled = true, Response = response };
public static ConsumerErrorHandlerResult SuccessWithResponse(object response) => new(ConsumerErrorHandlerResultEnum.Success, response);

/// <summary>
/// Retry processing the message without placing it back in the queue.
/// </summary>
public static readonly ConsumerErrorHandlerResult Retry = new(ConsumerErrorHandlerResultEnum.Retry);
}

public enum ConsumerErrorHandlerResultEnum
{
Fail,
Retry,
Success
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@
public interface IConsumerErrorHandler<in T>
{
/// <summary>
/// Executed when the message consumer (or handler) errors out. This interface allows to intercept and handle the exception.
/// Use the consumer context to get ahold of transport specific options to proceed (acknowledge/reject message).
/// <para>
/// Executed when the message consumer (or handler) errors out. The interface allows for interception of
/// exceptions to manipulate the processing pipeline (success/fail/retry).
/// </para>
/// <para>
/// The consumer context is available to apply transport specific operations (acknowledge/reject/dead letter/etc).
/// </para>
/// <para>
/// If message execution is to be re-attempted, any delays/jitter should be applied before the method returns.
/// </para>
/// </summary>
/// <param name="message">The message that failed to process.</param>
/// <param name="retry">Performs another message processing try. The return value is relevant if the consumer was a request handler (it will be its response value). Ensure to pass the return value to the result of the error handler.</param>
/// <param name="consumerContext">The consumer context for the message processing pipeline.</param>
/// <param name="exception">Exception that occurred during message processing.</param>
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
/// <returns>The error handling result.</returns>
Task<ConsumerErrorHandlerResult> OnHandleError(T message, Func<Task<object>> retry, IConsumerContext consumerContext, Exception exception);
Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}
// doc:fragment:Interface
// doc:fragment:Interface
Loading

0 comments on commit 28091d8

Please sign in to comment.