So I came up with this solution. Which replaces the default error strategy by EasyNetQ.
public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
: base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
{
}
public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
{
object deathHeaderObject;
if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
return AckStrategies.NackWithoutRequeue;
var deathHeaders = deathHeaderObject as IList;
if (deathHeaders == null)
return AckStrategies.NackWithoutRequeue;
var retries = 0;
foreach (IDictionary header in deathHeaders)
{
var count = int.Parse(header["count"].ToString());
retries += count;
}
if (retries < 3)
return AckStrategies.NackWithoutRequeue;
return base.HandleConsumerError(context, exception);
}
}
You replace it like this:
RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())
You have to use the AdvancedBus
so you have to setup everything up manually.
using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
bus.Advanced.Bind(deadExchange, queue, "");
bus.Advanced.Bind(textExchange, queue, "");
bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}
This will dead letter a failed message 3 times. After that it'll go to the default error queue provided by EasyNetQ for error handling. You can subscribe to that queue.
A message is dead lettered when an exception propagates out of your consumer method. So this would trigger a dead letter.
static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
throw new Exception("This is a test!");
}