Serial processing of a certain message type in Rebus
Asked Answered
S

1

2

We have a Rebus message handler that talks to a third party webservice. Due to reasons beyond our immediate control, this WCF service frequently throws an exception because it encountered a database deadlock in its own database. Rebus will then try to process this message five times, which in most cases means that one of those five times will be lucky and not get a deadlock. But it frequently happens that a message does get deadlock after deadlock and ends up in our error queue.

Besides fixing the source of the deadlocks, which would be a longterm goal, I can think of two options:

  1. Keep trying with only this particular message type until it succeeds. Preferably I would be able to set a timeout, so "if five deadlocks then try again in 5 minutes" rather than choke the process up even more by trying continuously. I already do a Thread.Sleep(random) to spread the messages somewhat, but it will still give up after five tries.

  2. Send this particular message type to a different queue that has only one worker that processes the message, so that this happens serially rather than in parallel. Our current configuration uses 8 worker threads, but this just makes the deadlock situation worse as the webservice now gets called concurrently and the messages get in each other's way.

Option #2 has my preference, but I'm not sure if this is possible. Our configuration on the receiving side currently looks like this:

var adapter = new Rebus.Ninject.NinjectContainerAdapter(this.Kernel);

var bus = Rebus.Configuration.Configure.With(adapter)
    .Logging(x => x.Log4Net())
   .Transport(t => t.UseMsmqAndGetInputQueueNameFromAppConfig())
   .MessageOwnership(d => d.FromRebusConfigurationSection())
   .CreateBus().Start();

And the .config for the receiving side:

<rebus inputQueue="app.msg.input" errorQueue="app.msg.error" workers="8">
  <endpoints>
  </endpoints>
</rebus>

From what I can tell from the config, it's only possible to set one input queue to 'listen' to. I can't really find a way to do this via the fluent mapping API either. That seems to take only one input- and error queue as well:

.Transport(t =>t.UseMsmq("input", "error"))

Basically, what I'm looking for is something along the lines of:

<rebus workers="8">
  <input name="app.msg.input" error="app.msg.error" />
  <input name="another.input.queue" error="app.msg.error" />
</rebus>

Any tips on how to handle my requirements?

Shalon answered 30/5, 2013 at 13:33 Comment(0)
S
2

I suggest you make use of a saga and Rebus' timeout service to implement a retry strategy that fits your needs. This way, in your Rebus-enabled web service facade, you could do something like this:

public void Handle(TryMakeWebServiceCall message)
{
    try
    {
        var result = client.MakeWebServiceCall(whatever);

        bus.Reply(new ResponseWithTheResult{ ... });
    }
    catch(Exception e)
    {
        Data.FailedAttempts++;

        if (Data.FailedAttempts < 10)
        {
            bus.Defer(TimeSpan.FromSeconds(1), message);
            return;
        }

        // oh no! we failed 10 times... this is probably where we'd
        // go and do something like this:
        emailService.NotifyAdministrator("Something went wrong!");
    }
}

where Data is the saga data that is made magically available to you and persisted between calls.

For inspiration on how to create a saga, check out the wiki page on coordinating stuff that happens over time where you can see an example on how a service might have some state (i.e. number of failed attempts in your case) stored locally that is made available between handling messages.

When the time comes to make bus.Defer work, you have two options: 1) use an external timeout service (which I usually have installed one of on each server), or 2) just use "yourself" as a timeout service.

At configuration time, you go

Configure.With(...)
    .(...)
    .Timeouts(t => // configure it here)

where you can either StoreInMemory, StoreInSqlServer, StoreInMongoDb, StoreInRavenDb, or UseExternalTimeoutManager.

If you choose (1), you need to check out the Rebus code and build Rebus.Timeout yourself - it's basically just a configurable, Topshelf-enabled console application that has a Rebus endpoint inside.

Please let me know if you need more help making this work - bus.Defer is where your system becomes awesome, and will be capable of overcoming all of the little glitches that make all others' go down :)

Stearoptene answered 30/5, 2013 at 19:3 Comment(6)
Ok, cool! I'll definitely take a look at sagas tomorrow. I don't immediately see how to configure where the Saga state is stored (probably SQL Server for us) but I'm sure I can find that somewhere. Thanks! I'll let you know if this works for me. EDIT: Already found where :)Shalon
I can see some potential for growth here, as this webservice is used to place an order and we make a promise to customers that if they place the order before a certain time, they get it delivered the next day. Using this approach, we could start sending panic emails/texts in case that 'deadline' approaches and there's still messages to be processed.Shalon
I've attempted to make this work and I've come a long way, but I've ran into a problem. When the bus.Defer occurs it serializes the ISagaData class I've defined, but upon deserialization Newtonsoft.Json claims that it can't load the assembly that the type is defined in. Contents of message (in SQL Server): { "$type": "CeyennePOS.Tasks.MQ.ExternalOrderSagaData, CeyennePOS.Tasks.MQ", "OrderID": 100, "Attempts": 1, "Id": "4b758670-7f39-4e51-8502-85a27a34e933", "Revision": 1 }. This assembly is a plugin that's been loaded with Assembly.LoadFile.Shalon
It does work btw when I use a "core" assembly that's always loaded, instead of an assembly that gets dynamically loaded. The thing is, I know for certain that the plugin assembly has been loaded by the time the Rebus IBus has been set up. So I don't know why it's not being found. I would rather define the saga types in the plugin assemblies so that only those plugins have a dependency on Rebus.Shalon
Is there a particular reason why, during the bus.Defer call, you create a new message instance, instead of passing the original message variable from the method argument?Stradivarius
No reason - in fact, I think that makes my example slightly confusing... I'll fix that right away!Stearoptene

© 2022 - 2024 — McMap. All rights reserved.