1. How to retry messages whose processing has failed?
To keep things simple and not involve any re-queuing and scheduling, I am simply using Polly to allow a few attempts, with some minimal gradual backoff. Only if all attempts fail does the exception propagate and is the message moved to the error queue.
public TimeSpan BaseDelay { get; init; } = TimeSpan.FromSeconds(2); // Balance chance to succeed with throughput
private ResiliencePipeline HandlerRetryPipeline => _handlerRetryPipeline ??= new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions()
{
ShouldHandle = new PredicateBuilder().Handle<Exception>(),
BackoffType = DelayBackoffType.Exponential,
MaxRetryAttempts = 4,
// Balance chance to succeed with throughput
DelayGenerator = args => ValueTask.FromResult(args.AttemptNumber switch
{
0 => TimeSpan.Zero, // E.g. at 00.000
1 => 1 * BaseDelay, // E.g. at 02.000
2 => 2 * BaseDelay, // E.g. at 06.000
3 => 3 * BaseDelay, // E.g. at 14.000
_ => (TimeSpan?)null,
}),
OnRetry = _ => ValueTask.CompletedTask,
})
.Build();
private ResiliencePipeline? _handlerRetryPipeline;
private async Task ExecuteWithRetriesAsync<TMessage>(
string stableHandlerName,
Func<TMessage, CancellationToken, Task> handler,
TMessage @event,
CancellationToken cancellationToken)
{
try
{
await HandlerRetryPipeline.ExecuteAsync(
callback: (state, cancellationToken) => new ValueTask(
state.Handler.Invoke(state.Event, cancellationToken)),
state: (Handler: handler, Event: @event),
cancellationToken: cancellationToken);
}
catch (Exception e)
{
logger.LogError(e,
"{Event} handler failed, now requiring the problem to be fixed and the problematic messages to be shoveled back to {MainQueue} from the error queue: {Message}",
typeof(TMessage).Name, stableHandlerName, e.Message);
throw;
}
}
Bear in mind that each application replica will work on a limited number of messages simultaneously, which means throughput gets affected if processing takes longer, which means we should not delay for long while holding onto a message. You may need shorter delays.
2. How to have a dedicated error queue per message type?
IConventions
will let us do this, and most easily so by subclassing Conventions
.
services.RegisterEasyNetQ("ConnectionString", services =>
{
services.Register<IConventions>(serviceProvider => serviceProvider.Resolve<CustomNamingConvention>());
// Snip
});
public sealed class CustomNamingConvention : Conventions
{
public CustomNamingConvention(ITypeNameSerializer typeNameSerializer)
: base(typeNameSerializer)
{
QueueNamingConvention = this.GetQueueName; // Own method
ExchangeNamingConvention = this.GetExchangeName; // Own method
ErrorQueueNamingConvention = info => $"{info.Queue}_error";
ErrorExchangeNamingConvention = info => $"{info.Exchange}_error";
}
// Snip
}
3. How to [manually] requeue messages for reprocessing?
Essentially, if we install RabbitMQ's shovel plugin, we can simply go to the management UI, find an error queue, and move all of its messages into the corresponding regular queue. However...
EasyNetQ wraps messages in Error
objects. They are now of the wrong type and cannot be processed using our existing handler. EasyNetQ makes it surprisingly difficult to deal with this without resorting to the advanced API (giving up many advantages).
The least invasive approach I could come up with is as follows. The undesirable message wrapping is done by DefaultConsumerErrorStrategy
, but its relevant methods are private and nonvirtual. Luckily, it delegates the work to two injected interfaces: ISerializer
(for the message body) and ITypeNameSerializer
(for the type name of the message body). The former is easy to manipulate, and the latter takes some trickery.
For ISerializer.MessageToBytes()
, we will check if we are receiving an Error
type and object. If so, we will extract its Message
and serialize that instead.
For ITypeNameSerializer.Serialize()
, we only receive a Type
(which will be Error
in the problematic case), but nothing to let us know the original type. The trickery is to use an AsyncLocal<T>
just before the method is called, to remember the desired type name in such a way that only the current flow of execution can see it. If the Serialize()
method sees that such a type was stored, we will have it use it. This approach is concurrency-safe even if our class is a singleton.
The code here assumes serialization with System.Text.Json
, but that can easily be replaced.
services.RegisterEasyNetQ("ConnectionString", services =>
{
services.EnableSystemTextJsonWithErrorUnwrapping(new JsonSerializerOptions(JsonSerializerDefaults.General)
{
// Or whatever options you like
Converters = { new JsonStringEnumConverter() },
});
// Snip
});
/// <summary>
/// Registers a <see cref="System.Text.Json"/> serializer for <see cref="EasyNetQ"/> that also unwraps error messages, to enable shoveling.
/// </summary>
private static IServiceRegister EnableSystemTextJsonWithErrorUnwrapping(this IServiceRegister serviceRegister, JsonSerializerOptions options)
{
serviceRegister.Register(Options.Create(options));
serviceRegister.Register<SystemTextJsonSerializerWithErrorUnwrapping>(Lifetime.Singleton);
serviceRegister.Register<ISerializer>(register => register.Resolve<SystemTextJsonSerializerWithErrorUnwrapping>());
serviceRegister.Register<IConsumerErrorStrategy>(register => register.Resolve<SystemTextJsonSerializerWithErrorUnwrapping>());
return serviceRegister;
}
/// <summary>
/// <para>
/// A combined <see cref="System.Text.Json"/> serializer and error handler for <see cref="EasyNetQ"/> that also unwraps <see cref="EasyNetQ.SystemMessages.Error"/> messages, to enable shoveling.
/// </para>
/// <para>
/// By default, <see cref="EasyNetQ"/> stores an <see cref="EasyNetQ.SystemMessages.Error"/> wrapper that cannot be handled.
/// </para>
/// </summary>
internal sealed class SystemTextJsonSerializerWithErrorUnwrapping : ISerializer, IConsumerErrorStrategy
{
private CustomizableTypeNameSerializer TypeNameSerializer { get; }
private EasyNetQ.Serialization.SystemTextJson.SystemTextJsonSerializer DecoratedSerializer { get; } // TODO: Move to SystemTextJsonSerializerV2 once 8.0.0 is released
private DefaultConsumerErrorStrategy DecoratedErrorStrategy { get; }
public SystemTextJsonSerializerWithErrorUnwrapping(
IOptions<JsonSerializerOptions> serializerOptions,
EasyNetQ.Logging.ILogger<DefaultConsumerErrorStrategy> logger,
IConsumerConnection connection,
IConventions conventions,
ITypeNameSerializer typeNameSerializer,
IErrorMessageSerializer errorMessageSerializer,
ConnectionConfiguration configuration)
{
TypeNameSerializer = new CustomizableTypeNameSerializer(typeNameSerializer);
DecoratedSerializer = new EasyNetQ.Serialization.SystemTextJson.SystemTextJsonSerializer(serializerOptions.Value);
DecoratedErrorStrategy = new DefaultConsumerErrorStrategy(logger, connection, serializer: this, conventions, TypeNameSerializer, errorMessageSerializer, configuration);
}
public void Dispose()
{
}
public IMemoryOwner<byte> MessageToBytes(Type messageType, object message)
{
if (messageType == typeof(EasyNetQ.SystemMessages.Error) && message is EasyNetQ.SystemMessages.Error error)
return SerializeMessageFromError(error);
return DecoratedSerializer.MessageToBytes(messageType, message);
// Local function that serializes the message that was wrapped in an Error object
static IMemoryOwner<byte> SerializeMessageFromError(EasyNetQ.SystemMessages.Error error)
{
ArrayPooledMemoryStream? result = null;
try
{
result = new ArrayPooledMemoryStream();
result.SetLength(4 * error.Message.Length);
Encoding.UTF8.TryGetBytes(error.Message, result.Memory.Span, out var byteCount);
result.SetLength(byteCount);
return result;
}
catch
{
result?.Dispose();
throw;
}
}
}
public object BytesToMessage(Type messageType, in ReadOnlyMemory<byte> bytes)
{
return DecoratedSerializer.BytesToMessage(messageType, in bytes);
}
public async Task<AckStrategy> HandleConsumerErrorAsync(ConsumerExecutionContext context, Exception exception, CancellationToken cancellationToken)
{
// Ambiently customize the TypeNameSerializer to use the actual message's type (instead of Error)
// This affects only the current flow of execution
TypeNameSerializer.CurrentTypeName.Value = context.Properties.Type;
// The handler will delegate to the type name serializer and the serializer, which will return the unwrapped message type and message
var result = await DecoratedErrorStrategy.HandleConsumerErrorAsync(context, exception, cancellationToken);
return result;
}
public Task<AckStrategy> HandleConsumerCancelledAsync(ConsumerExecutionContext context, CancellationToken cancellationToken = default)
{
return DecoratedErrorStrategy.HandleConsumerCancelledAsync(context, cancellationToken);
}
/// <summary>
/// An <see cref="ITypeNameSerializer"/> decorator that allows the serialized result to be customized.
/// </summary>
private sealed class CustomizableTypeNameSerializer(
ITypeNameSerializer decoratedTypeNameSerializer)
: ITypeNameSerializer
{
/// <summary>
/// Assign the <see cref="AsyncLocal{T}.Value"/> to customize the serialized result in the current execution flow.
/// </summary>
internal AsyncLocal<string> CurrentTypeName { get; } = new AsyncLocal<string>();
public string Serialize(Type type)
{
return CurrentTypeName.Value ?? decoratedTypeNameSerializer.Serialize(type);
}
public Type DeSerialize(string typeName)
{
return decoratedTypeNameSerializer.DeSerialize(typeName);
}
}
}