How to enforce message queue sequence with multiple WCF service instances
Asked Answered
L

2

8

I want to create a WCF service which uses an MSMQ binding as I have a high volume of notifications the service is to process. It is important that clients are not held up by the service and that the notifications are processed in the order they are raised, hence the queue implementation.

Another consideration is resilience. I know I could cluster MSMQ itself to make the queue more robust, but I want to be able to run an instance of my service on different servers, so if a server crashes notifications do not build up in the queue but another server carries on processing.

I have experimented with the MSMQ binding and found that you can have multiple instances of a service listening on the same queue, and left to themselves they end up doing a sort of round-robin with the load spread across the available services. This is great, but I end up losing the sequencing of the queue as different instances take a different amount of time to process the request.

I've been using a simple console app to experiment, which is the epic code dump below. When it's run I get an output like this:

host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed

What I want to happen is:

host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.

I would have thought that as S2 has not completed, it might still fail and return the message it was processing to the queue. Therefore S1 should not be allowed to pull another message off of the queue. My queue us transactional and I have tried setting TransactionScopeRequired = true on the service but to no avail.

Is this even possible? Am I going about it the wrong way? Is there some other way to build a failover service without some kind of central synchronisation mechanism?

class WcfMsmqProgram
{
    private const string QueueName = "testq1";

    static void Main()
    {
        // Create a transactional queue
        string qPath = ".\\private$\\" + QueueName;
        if (!MessageQueue.Exists(qPath))
            MessageQueue.Create(qPath, true);
        else
            new MessageQueue(qPath).Purge();

        // S1 processes as fast as it can
        IService s1 = new ServiceImpl("S1");
        // S2 is slow
        IService s2 = new ServiceImpl("S2", 2000);

        // MSMQ binding
        NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);

        // Host S1
        ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
        ConfigureService(host1, binding);
        host1.Open();
        Console.WriteLine("host1 open");

        // Host S2
        ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
        ConfigureService(host2, binding);
        host2.Open();
        Console.WriteLine("host2 open");

        // Create a client 
        ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
        IService client = factory.CreateChannel();

        // Periodically call the service with a new number
        int counter = 1;
        using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
        {
            // Enter to stop
            Console.ReadLine();
        }

        host1.Close();
        Console.WriteLine("host1 closed");
        host2.Close();
        Console.WriteLine("host2 closed");

        // Wait for exit
        Console.ReadLine();
    }

    static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
    {
        var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
    }

    [ServiceContract]
    interface IService
    {
        [OperationContract(IsOneWay = true)]
        void EchoNumber(int number);
    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    class ServiceImpl : IService
    {
        public ServiceImpl(string name, int sleep = 0)
        {
            this.name = name;
            this.sleep = sleep;
        }

        private string name;
        private int sleep;

        public void EchoNumber(int number)
        {
            Thread.Sleep(this.sleep);
            Console.WriteLine("{0}: {1:00}", this.name, number);
        }
    }
}
Laroy answered 10/12, 2012 at 13:57 Comment(1)
You'll have hard time to do this with WCF's MSMQ binding (see #730112). However you could to it using transactional MSMQ and some reconciliation code (without WCF).Keil
L
10

batwad,

You are trying to manually create a service bus. Why don't you try to use an existing one?

NServiceBus, MassTransit, ServiceStack

At least 2 of those work with MSMQ.

Furthermore, if you absolutely need order it may actually be for another reason - you want to be able to send a message and you don't want dependent messages to be processed before the first message. You are looking for the Saga Pattern. NServiceBus and MassTransit both will allow you to manage Sagas easily, they will both allow you to simply trigger the initial message and then trigger the remaining messages based on conditions. It will allow you to implement the plumping of your distributed application a snap.

You can then even scale up to thousands of clients, queue servers and message processors without having to write a single line of code nor have any issues.

We tried to implement our own service bus over msmq here, we gave up because another issue kept creeping up. We went with NServiceBus but MassTransit is also an excellent product (it's 100% open source, NServiceBus isn't). ServiceStack is awesome at making APIs and using Message Queues - I'm sure you could use it to make Services that act as Queue front-ends in minutes.

Oh, did I mention that in the case of NSB and MT both only require under 10 lines of code to fully implement queues, senders and handlers?

----- ADDED -----

Udi Dahan (one of the main contributers of NServiceBus) talks about this in: "In-Order Messaging a Myth" by Udi Dahan "Message Ordering: Is it Cost Effective?" with Udi Dahan

Chris Patterson (one of the main contributers of Mass Transit) "Using Sagas to ensure proper sequential message order" question

StackOverflow questions/answers: "Preserve message order when consuming MSMQ messages in a WCF application"

----- QUESTION -----

I must say that I'm baffled as to why you need to guarantee message order - would you be in the same position if you were using an HTTP/SOAP protocol? My guess is no, then why is it a problem in MSMQ?

Good luck, hope this helps,

Lynettelynn answered 13/12, 2012 at 18:29 Comment(6)
I am raising notifications which are dependent on each other. For example, a train can't leave a station before it has arrived there. I want to make sure the OnDepart notification is handled after OnArrive, otherwise the OnDepart handling might not work (e.g.: it couldn't work out how long the train spent at the station if it hadn't handled the corresponding OnArrive yet)Laroy
You should never rise OnArrive before OnDepart, which also means you should not send it. Look at the Saga Pattern, it's exactly what you mean - you need to manage state outside of MSMQ.Lynettelynn
Ok, I think I finally got it... You are sending an OnDepart event, however you are concerned that when the OnArrive event gets triggered later on the OnDepart event may not have been processed yet... Yes I see the issue - but unfortunately I can't really give you a golden solution other than perhaps: Use some sort of a unique identifier, if you can't guarantee that MSMQ sends a message before another one (train origin network crashes therefore OnDepart is stuck there, and eventually train arrives) then you have to code around it. If you get an OnArrive event and the OnDepartLynettelynn
events hasn't been processed then I would build a contingency code that actually may allow triggering the event even if it seems that the departure was never executed - this is your only way of doing this. If you include a unique identifier for the entire saga then you will be able to reference the out of order events and avoid creating duplicate database records because of the mixed up sequence. In the train example you would likely create a "left station" record and a "arrived" record on each event. I would say here create both records on OnArrive but leave the depart time emptyLynettelynn
and on the OnDepart event, again use the same unique identifier to see if the records already exist (or something else) and if they do - simply log the information that needs to be logged. Discard the extra events that shouldn't run because of the fact that you got the messages out of order. If you can't fix the issue of out of order messages, then at least plan for it. Hope this helpsLynettelynn
Sorry I didn't reply sooner; Christmas etc. got in the way. I'm not sure I really have a saga but you've at least confirmed it's a hard problem and I haven't missed something easy!Laroy
I
1

Ensuring in-order delivery of messages is one of the de-facto sticky issues with high volume messaging.

In an ideal world, your message destinations should be able to handle out-of-order messaging. This can be achieved by ensuring that your message source includes some kind of sequencing information. Again ideally this takes the form of some kind of x-of-n batch stamp (message 1 of 10, 2 of 10, etc). Your message destination is then required to assemble the data into order once it has been delivered.

However, in the real world there often is no scope for changing downstream systems to handle messages arriving out of order. In this instance you have two choices:

  1. Go entirely single threaded - actually you can usually find some kind of 'grouping id' which means you can go single-threaded in a for-each-group sense, meaning you still have concurrency across different message groups.
  2. Implement a re-sequencer wrapper around each of your consumer systems you want to receive in-order messages.

Neither solution is very nice, but that's the only way I think you can have concurrency and in-order message delivery.

Intermittent answered 10/12, 2012 at 14:25 Comment(2)
Interesting, but if one ServerA takes message 1-of-10 off of the queue, and ServerB takes message 2-of-10 off of the queue, neither will ever get a complete batch to resequence.Laroy
So in this configuration you need one queue per group. It's not very practical.Intermittent

© 2022 - 2024 — McMap. All rights reserved.