Is there an easy way to subscribe to the default error queue in EasyNetQ?
Asked Answered
S

2

6

In my test application I can see messages that were processed with an exception being automatically inserted into the default EasyNetQ_Default_Error_Queue, which is great. I can then successfully dump or requeue these messages using the Hosepipe, which also works fine, but requires dropping down to the command line and calling against both Hosepipe and the RabbitMQ API to purge the queue of retried messages.

So I'm thinking the easiest approach for my application is to simply subscribe to the error queue, so I can re-process them using the same infrastructure. But in EastNetQ, the error queue seems to be special. We need to subscribe using a proper type and routing ID, so I'm not sure what these values should be for the error queue:

bus.Subscribe<WhatShouldThisBe>("and-this", ReprocessErrorMessage);

Can I use the simple API to subscribe to the error queue, or do I need to dig into the advanced API?

If the type of my original message was TestMessage, then I'd like to be able to do something like this:

bus.Subscribe<ErrorMessage<TestMessage>>("???", ReprocessErrorMessage);

where ErrorMessage is a class provided by EasyNetQ to wrap all errors. Is this possible?

Spoonerism answered 25/2, 2013 at 2:11 Comment(0)
C
3

You can't use the simple API to subscribe to the error queue because it doesn't follow EasyNetQ queue type naming conventions - maybe that's something that should be fixed ;)

But the Advanced API works fine. You won't get the original message back, but it's easy to get the JSON representation which you could de-serialize yourself quite easily (using Newtonsoft.JSON). Here's an example of what your subscription code should look like:

[Test]
[Explicit("Requires a RabbitMQ server on localhost")]
public void Should_be_able_to_subscribe_to_error_messages()
{
    var errorQueueName = new Conventions().ErrorQueueNamingConvention();
    var queue = Queue.DeclareDurable(errorQueueName);
    var autoResetEvent = new AutoResetEvent(false);

    bus.Advanced.Subscribe<SystemMessages.Error>(queue, (message, info) =>
    {
        var error = message.Body;

        Console.Out.WriteLine("error.DateTime = {0}", error.DateTime);
        Console.Out.WriteLine("error.Exception = {0}", error.Exception);
        Console.Out.WriteLine("error.Message = {0}", error.Message);
        Console.Out.WriteLine("error.RoutingKey = {0}", error.RoutingKey);

        autoResetEvent.Set();
        return Task.Factory.StartNew(() => { });
    });

    autoResetEvent.WaitOne(1000);
}

I had to fix a small bug in the error message writing code in EasyNetQ before this worked, so please get a version >= 0.9.2.73 before trying it out. You can see the code example here

Clustered answered 25/2, 2013 at 10:15 Comment(5)
Thanks for your help Mike. Do you think it would be a good idea to expose a friendly wrapper for this? By default, the simple API writes to this error queue, so to me it makes sense for it to have a mechanism to transparently consume these errors too. Even with your code above, I think I need to have some kind of switch statement in the error message handler (switch (error.BasicProperties.Type)), which will then allow me to deserialize the original message to the correct type, which is a bit ugly. It would be nice if I could subscribe to the specific type of error in which I am interested.Spoonerism
BTW I love the EastNetQ library. Thanks for your hard work and great documentation.Spoonerism
Thanks for the kind words! I've added your suggestion to the issues list github.com/mikehadlow/EasyNetQ/issues/71 but I'm not completely convinced.Clustered
The code above no longer compiles, the link to the source is no longer valid, and I can't find anything that answers the question anywhere. Was this rewritten in the last year so it followed the naming convention? How is it done now?Waler
@TraderhutGames Yes, the API has been considerably changed since then. You still have to use the advanced API to directly access the queue though.Clustered
W
0

Code that works: (I took a guess)

The screwyness with the 'foo' is because if I just pass that function HandleErrorMessage2 into the Consume call, it can't figure out that it returns a void and not a Task, so can't figure out which overload to use. (VS 2012) Assigning to a var makes it happy. You will want to catch the return value of the call to be able to unsubscribe by disposing the object.

Also note that Someone used a System Object name (Queue) instead of making it a EasyNetQueue or something, so you have to add the using clarification for the compiler, or fully specify it.

 using Queue = EasyNetQ.Topology.Queue;

  private const string QueueName = "EasyNetQ_Default_Error_Queue";
  public static void Should_be_able_to_subscribe_to_error_messages(IBus bus)
  {
     Action <IMessage<Error>, MessageReceivedInfo> foo = HandleErrorMessage2;

     IQueue queue = new Queue(QueueName,false);
     bus.Advanced.Consume<Error>(queue, foo);
  }

  private static void HandleErrorMessage2(IMessage<Error> msg, MessageReceivedInfo info)
 {
 }
Waler answered 17/7, 2014 at 19:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.