How to do error handling with EasyNetQ / RabbitMQ
Asked Answered
E

5

25

I'm using RabbitMQ in C# with the EasyNetQ library. I'm using a pub/sub pattern here. I still have a few issues that I hope anyone can help me with:

  1. When there's an error while consuming a message, it's automatically moved to an error queue. How can I implement retries (so that it's placed back on the originating queue, and when it fails to process X times, it's moved to a dead letter queue)?
  2. As far as I can see there's always 1 error queue that's used to dump messages from all other queues. How can I have 1 error queue per type, so that each queue has its own associated error queue?
  3. How can I easily retry messages that are in an error queue? I tried Hosepipe, but it justs republishes the messages to the error queue instead of the originating queue. I don't really like this option either because I don't want to be fiddling around in a console. Preferably I'd just program against the error queue.

Anyone?

Ellenaellender answered 18/6, 2015 at 11:52 Comment(0)
G
18

The problem you are running into with EasyNetQ/RabbitMQ is that it's much more "raw" when compared to other messaging services like SQS or Azure Service Bus/Queues, but I'll do my best to point you in the right direction.

Question 1.

This will be on you to do. The simplest way is that you can No-Ack a message in RabbitMQ/EasyNetQ, and it will be placed at the head of the queue for you to retry. This is not really advisable because it will be retried almost immediately (With no time delay), and will also block other messages from being processed (If you have a single subscriber with a prefetch count of 1).

I've seen other implementations of using a "MessageEnvelope". So a wrapper class that when a message fails, you increment a retry variable on the MessageEnvelope and redeliver the message back onto the queue. YOU would have to do this and write the wrapping code around your message handlers, it would not be a function of EasyNetQ.

Using the above, I've also seen people use envelopes, but allow the message to be dead lettered. Once it's on the dead letter queue, there is another application/worker reading items from the dead letter queue.

All of these approaches above have a small issue in that there isn't really any nice way to have a logarithmic/exponential/any sort of increasing delay in processing the message. You can "hold" the message in code for some time before returning it to the queue, but it's not a nice way around.

Out of all of these options, your own custom application reading the dead letter queue and deciding whether to reroute the message based on an envelope that contains the retry count is probably the best way.

Question 2.

You can specify a dead letter exchange per queue using the advanced API. (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues). However this means you will have to use the advanced API pretty much everywhere as using the simple IBus implementation of subscribe/publish looks for queues that are named based on both the message type and subscriber name. Using a custom declare of queue means you are going to be handling the naming of your queues yourself, which means when you subscribe, you will need to know the name of what you want etc. No more auto subscribing for you!

Question 3

An Error Queue/Dead Letter Queue is just another queue. You can listen to this queue and do what you need to do with it. But there is not really any out of the box solution that sounds like it would fit your needs.

Guib answered 22/6, 2015 at 4:32 Comment(1)
We found there was really no practical use for the standard EasyNetQ implementation outside of a single quick demonstration, tied to some shared .NET classes, for first-time users. After that, switch to Easy's super-simple "advanced" API. Yes, you can do advanced things but honestly it's a beautifully simple API to use. Definitely a fan of Easy's Advanced API for any and all work.Clipboard
E
9

I've implemented exactly what you describe. Here are some tips based on my experience and related to each of your questions.

Q1 (how to retry X times):

For this, you can use IMessage.Body.BasicProperties.Headers. When you consume a message off an error queue, just add a header with a name that you choose. Look for this header on each message that comes into the error queue and increment it. This will give you a running retry count.

It's very important that you have a strategy for what to do when a message exceeds the retry limit of X. You don't want to lose that message. In my case, I write the message to disk at that point. It gives you lots of helpful debugging information to come back to later, because EasyNetQ automatically wraps your originating message with error info. It also has the original message so that you can, if you like, manually (or maybe automated, through some batch re-processing code) requeue the message later in some controlled way.

You can look at the code in the Hosepipe utility to see a good way of doing this. In fact, if you follow the pattern you see there then you can even use Hosepipe later to requeue the messages if you need to.

Q2 (how to create an error queue per originating queue):

You can use the EasyNetQ Advanced Bus to do this cleanly. Use IBus.Advanced.Container.Resolve<IConventions> to get at the conventions interface. Then you can set the conventions for the error queue naming with conventions.ErrorExchangeNamingConvention and conventions.ErrorQueueNamingConvention. In my case I set the convention to be based on the name of the originating queue so that I get a queue/queue_error pair of queues every time I create a queue.

Q3 (how to process messages in the error queues):

You can declare a consumer for the error queue the same way you do any other queue. Again, the AdvancedBus lets you do this cleanly by specifying that the type coming off of the queue is EasyNetQ.SystemMessage.Error. So, IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>() will get you there. Retrying simply means republishing to the original exchange (paying attention to the retry count you put in the header (see my answer to Q1, above), and information in the Error message that you consumed off the error queue can help you find the target for republishing.

Euphorbia answered 29/9, 2015 at 18:11 Comment(1)
I got it working, see example here #32077544Pittman
O
5

I know this is an old post but - just in case it helps someone else - here is my self-answered question (I needed to ask it because existing help was not enough) that explains how I implemented retrying failed messages on their original queues. The following should answer your question #1 and #3. For #2, you may have to use the Advanced API, which I haven't used (and I think it defeats the purpose of EasyNetQ; one might as well use RabbitMQ client directly). Also consider implementing IConsumerErrorStrategy, though.

1) Since there can be multiple consumers of a message and all may not need to retry a msg, I have a Dictionary<consumerId, RetryInfo> in the body of the message, as EasyNetQ does not (out of the box) support complex types in message headers.

public interface IMessageType
{
    int MsgTypeId { get; }

    Dictionary<string, TryInfo> MsgTryInfo {get; set;}

}

2) I have implemented a class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer that just updates the TryCount and other information every time it is called by the framework. I attach this custom serializer to the framework on a per-consumer basis via the IoC support provided by EasyNetQ.

 public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

And in my EasyNetQ wrapper class:

    public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
    {
        if (enableRetry)
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString,
                                                        serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
                                                );
        }
        else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
        {
            _defaultBus = RabbitHutch.CreateBus(currentConnString);
        }
    }

    public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
    {
        IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
        // Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
        // The mediator can transmit the retried msg or choose to ignore it
        return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
    }

3) Once the message is added to the default error queue, you can have a simple console app/windows service that periodically republishes existing error messages on their original queues. Something like:

var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
    var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
    var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
    pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
    pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
    pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
    var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}

4) I have a MessageHandler class that contains a callback func. Whenever a message is delivered to the consumer, it goes to the MessageHandler, which decides if the message try is valid and calls the actual callback if so. If try is not valid (maxRetriesExceeded/the consumer does not need to retry anyway), I ignore the message. You can choose to Dead Letter the message in this case.

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only 
                                                      // if Retry is valid
}

Here is the mediator function in MsgHandler that invokes the callback:

    public async Task InvokeMsgCallbackFunc(T msg)
    {
        if (IsTryValid(msg, CurrSubscriptionId))
        {
            await this.MsgCallbackFunc(msg);
        }
        else
        {
            // Do whatever you want
        }
    }
Outtalk answered 15/3, 2019 at 10:22 Comment(0)
L
1

Here, I have implemented a Nuget package (EasyDeadLetter) for this purpose, which can be easily implemented with the minimum changes in any project. All you need to do is follow the four steps :

  1. First of all, Decorate your class object with QeueuAttribute

     [Queue(“Product.Report”, ExchangeName = “Product.Report”)]
     public class ProductReport { }
    
  2. The second step is to define your dead-letter queue with the same QueueAttribute and also inherit the dead-letter object from the Main object class.

    [Queue(“Product.Report.DeadLetter”, ExchangeName = 
    “Product.Report.DeadLetter”)]
    public class ProductReportDeadLetter : ProductReport { }
    
  3. Now, it’s time to decorate your main queue object with the EasyDeadLetter attribute and set the type of dead-letter queue.

    [EasyDeadLetter(DeadLetterType =    
         typeof(ProductReportDeadLetter))]
    [Queue(“Product.Report”, ExchangeName = “Product.Report”)]
    public class ProductReport { }
    
  4. In the final step, you need to register EasyDeadLetterStrategy as the default error handler (IConsumerErrorStrategy).

     services.AddSingleton<IBus> 
     (RabbitHutch.CreateBus(“connectionString”,
        serviceRegister =>
        {
           serviceRegister.Register<IConsumerErrorStrategy, 
           EasyDeadLetterStrategy>();
        }));
    

That’s all. from now on any failed message will be moved to the related dead-letter queue.

See more detail here : GitHub Repository NuGet Package

Longtin answered 16/4, 2022 at 14:36 Comment(0)
A
1

1. How to retry messages whose processing has failed?

To keep things simple and not involve any re-queuing and scheduling, I am simply using Polly to allow a few attempts, with some minimal gradual backoff. Only if all attempts fail does the exception propagate and is the message moved to the error queue.

public TimeSpan BaseDelay { get; init; } = TimeSpan.FromSeconds(2); // Balance chance to succeed with throughput

private ResiliencePipeline HandlerRetryPipeline => _handlerRetryPipeline ??= new ResiliencePipelineBuilder()
    .AddRetry(new RetryStrategyOptions()
    {
        ShouldHandle = new PredicateBuilder().Handle<Exception>(),
        BackoffType = DelayBackoffType.Exponential,
        MaxRetryAttempts = 4,
        // Balance chance to succeed with throughput
        DelayGenerator = args => ValueTask.FromResult(args.AttemptNumber switch
        {
            0 => TimeSpan.Zero, // E.g. at 00.000
            1 => 1 * BaseDelay, // E.g. at 02.000
            2 => 2 * BaseDelay, // E.g. at 06.000
            3 => 3 * BaseDelay, // E.g. at 14.000
            _ => (TimeSpan?)null,
        }),
        OnRetry = _ => ValueTask.CompletedTask,
    })
    .Build();
private ResiliencePipeline? _handlerRetryPipeline;

private async Task ExecuteWithRetriesAsync<TMessage>(
    string stableHandlerName,
    Func<TMessage, CancellationToken, Task> handler,
    TMessage @event,
    CancellationToken cancellationToken)
{
    try
    {
        await HandlerRetryPipeline.ExecuteAsync(
            callback: (state, cancellationToken) => new ValueTask(
                state.Handler.Invoke(state.Event, cancellationToken)),
            state: (Handler: handler, Event: @event),
            cancellationToken: cancellationToken);
    }
    catch (Exception e)
    {
        logger.LogError(e,
            "{Event} handler failed, now requiring the problem to be fixed and the problematic messages to be shoveled back to {MainQueue} from the error queue: {Message}",
            typeof(TMessage).Name, stableHandlerName, e.Message);

        throw;
    }
}

Bear in mind that each application replica will work on a limited number of messages simultaneously, which means throughput gets affected if processing takes longer, which means we should not delay for long while holding onto a message. You may need shorter delays.

2. How to have a dedicated error queue per message type?

IConventions will let us do this, and most easily so by subclassing Conventions.

services.RegisterEasyNetQ("ConnectionString", services =>
{
    services.Register<IConventions>(serviceProvider => serviceProvider.Resolve<CustomNamingConvention>());

    // Snip
});

public sealed class CustomNamingConvention : Conventions
{
    public CustomNamingConvention(ITypeNameSerializer typeNameSerializer)
        : base(typeNameSerializer)
    {
        QueueNamingConvention = this.GetQueueName; // Own method
        ExchangeNamingConvention = this.GetExchangeName; // Own method
        ErrorQueueNamingConvention = info => $"{info.Queue}_error";
        ErrorExchangeNamingConvention = info => $"{info.Exchange}_error";
    }

    // Snip
}

3. How to [manually] requeue messages for reprocessing?

Essentially, if we install RabbitMQ's shovel plugin, we can simply go to the management UI, find an error queue, and move all of its messages into the corresponding regular queue. However...

EasyNetQ wraps messages in Error objects. They are now of the wrong type and cannot be processed using our existing handler. EasyNetQ makes it surprisingly difficult to deal with this without resorting to the advanced API (giving up many advantages).

The least invasive approach I could come up with is as follows. The undesirable message wrapping is done by DefaultConsumerErrorStrategy, but its relevant methods are private and nonvirtual. Luckily, it delegates the work to two injected interfaces: ISerializer (for the message body) and ITypeNameSerializer (for the type name of the message body). The former is easy to manipulate, and the latter takes some trickery.

For ISerializer.MessageToBytes(), we will check if we are receiving an Error type and object. If so, we will extract its Message and serialize that instead.

For ITypeNameSerializer.Serialize(), we only receive a Type (which will be Error in the problematic case), but nothing to let us know the original type. The trickery is to use an AsyncLocal<T> just before the method is called, to remember the desired type name in such a way that only the current flow of execution can see it. If the Serialize() method sees that such a type was stored, we will have it use it. This approach is concurrency-safe even if our class is a singleton.

The code here assumes serialization with System.Text.Json, but that can easily be replaced.

services.RegisterEasyNetQ("ConnectionString", services =>
{
    services.EnableSystemTextJsonWithErrorUnwrapping(new JsonSerializerOptions(JsonSerializerDefaults.General)
    {
        // Or whatever options you like
        Converters = { new JsonStringEnumConverter() },
    });

    // Snip
});

/// <summary>
/// Registers a <see cref="System.Text.Json"/> serializer for <see cref="EasyNetQ"/> that also unwraps error messages, to enable shoveling.
/// </summary>
private static IServiceRegister EnableSystemTextJsonWithErrorUnwrapping(this IServiceRegister serviceRegister, JsonSerializerOptions options)
{
    serviceRegister.Register(Options.Create(options));
    serviceRegister.Register<SystemTextJsonSerializerWithErrorUnwrapping>(Lifetime.Singleton);
    serviceRegister.Register<ISerializer>(register => register.Resolve<SystemTextJsonSerializerWithErrorUnwrapping>());
    serviceRegister.Register<IConsumerErrorStrategy>(register => register.Resolve<SystemTextJsonSerializerWithErrorUnwrapping>());

    return serviceRegister;
}


/// <summary>
/// <para>
/// A combined <see cref="System.Text.Json"/> serializer and error handler for <see cref="EasyNetQ"/> that also unwraps <see cref="EasyNetQ.SystemMessages.Error"/> messages, to enable shoveling.
/// </para>
/// <para>
/// By default, <see cref="EasyNetQ"/> stores an <see cref="EasyNetQ.SystemMessages.Error"/> wrapper that cannot be handled.
/// </para>
/// </summary>
internal sealed class SystemTextJsonSerializerWithErrorUnwrapping : ISerializer, IConsumerErrorStrategy
{
    private CustomizableTypeNameSerializer TypeNameSerializer { get; }
    private EasyNetQ.Serialization.SystemTextJson.SystemTextJsonSerializer DecoratedSerializer { get; } // TODO: Move to SystemTextJsonSerializerV2 once 8.0.0 is released
    private DefaultConsumerErrorStrategy DecoratedErrorStrategy { get; }

    public SystemTextJsonSerializerWithErrorUnwrapping(
        IOptions<JsonSerializerOptions> serializerOptions,
        EasyNetQ.Logging.ILogger<DefaultConsumerErrorStrategy> logger,
        IConsumerConnection connection,
        IConventions conventions,
        ITypeNameSerializer typeNameSerializer,
        IErrorMessageSerializer errorMessageSerializer,
        ConnectionConfiguration configuration)
    {
        TypeNameSerializer = new CustomizableTypeNameSerializer(typeNameSerializer);
        DecoratedSerializer = new EasyNetQ.Serialization.SystemTextJson.SystemTextJsonSerializer(serializerOptions.Value);
        DecoratedErrorStrategy = new DefaultConsumerErrorStrategy(logger, connection, serializer: this, conventions, TypeNameSerializer, errorMessageSerializer, configuration);
    }

    public void Dispose()
    {
    }

    public IMemoryOwner<byte> MessageToBytes(Type messageType, object message)
    {
        if (messageType == typeof(EasyNetQ.SystemMessages.Error) && message is EasyNetQ.SystemMessages.Error error)
            return SerializeMessageFromError(error);

        return DecoratedSerializer.MessageToBytes(messageType, message);

        // Local function that serializes the message that was wrapped in an Error object
        static IMemoryOwner<byte> SerializeMessageFromError(EasyNetQ.SystemMessages.Error error)
        {
            ArrayPooledMemoryStream? result = null;
            try
            {
                result = new ArrayPooledMemoryStream();
                result.SetLength(4 * error.Message.Length);
                Encoding.UTF8.TryGetBytes(error.Message, result.Memory.Span, out var byteCount);
                result.SetLength(byteCount);
                return result;
            }
            catch
            {
                result?.Dispose();
                throw;
            }
        }
    }

    public object BytesToMessage(Type messageType, in ReadOnlyMemory<byte> bytes)
    {
        return DecoratedSerializer.BytesToMessage(messageType, in bytes);
    }

    public async Task<AckStrategy> HandleConsumerErrorAsync(ConsumerExecutionContext context, Exception exception, CancellationToken cancellationToken)
    {
        // Ambiently customize the TypeNameSerializer to use the actual message's type (instead of Error)
        // This affects only the current flow of execution
        TypeNameSerializer.CurrentTypeName.Value = context.Properties.Type;

        // The handler will delegate to the type name serializer and the serializer, which will return the unwrapped message type and message
        var result = await DecoratedErrorStrategy.HandleConsumerErrorAsync(context, exception, cancellationToken);
        return result;
    }

    public Task<AckStrategy> HandleConsumerCancelledAsync(ConsumerExecutionContext context, CancellationToken cancellationToken = default)
    {
        return DecoratedErrorStrategy.HandleConsumerCancelledAsync(context, cancellationToken);
    }

    /// <summary>
    /// An <see cref="ITypeNameSerializer"/> decorator that allows the serialized result to be customized.
    /// </summary>
    private sealed class CustomizableTypeNameSerializer(
        ITypeNameSerializer decoratedTypeNameSerializer)
        : ITypeNameSerializer
    {
        /// <summary>
        /// Assign the <see cref="AsyncLocal{T}.Value"/> to customize the serialized result in the current execution flow.
        /// </summary>
        internal AsyncLocal<string> CurrentTypeName { get; } = new AsyncLocal<string>();

        public string Serialize(Type type)
        {
            return CurrentTypeName.Value ?? decoratedTypeNameSerializer.Serialize(type);
        }

        public Type DeSerialize(string typeName)
        {
            return decoratedTypeNameSerializer.DeSerialize(typeName);
        }
    }
}
Asphyxia answered 18/4 at 15:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.