How to process MSMQ messages in parallel
Asked Answered
C

8

17

I'm writing a windows service to consume MSMQ messages. The service will have periods of high activity (80k messages coming in very quickly) and long periods of inactivity (could be several days without a new message).

Processing the messages is very network-bound, so I get a big benefit out of parallelism. But during periods of inactivity, I don't want to tie up a bunch of threads waiting for messages that aren't coming anytime soon.

The MSMQ interface seems to be very focused on a synchronous workflow - get one message, process it, get another, etc. How should I structure my code so that I can take advantage of parallelism during periods of high activity but not tie up a bunch of threads during periods of no activity? Bonus points for using the TPL. Pseudocode would be appreciated.

Connel answered 31/3, 2011 at 17:38 Comment(1)
whilst the processing of a single message in a call back handler may appear to be synchronous, the concept of messaging in the grand scheme of things is asynchronous compared to RPCHopi
A
11

I have done allot of MSMQ (including mobile implementations) over the years and you are correct in the characterization of "synchronous workflow." It's not that you can't take the various message envelops and process them across the different cores via TPL ... the limiting factor is reading / writing to the queue ... inherently a serial operation. You can't send 8 messages at once (a computer with 8 cores) for example.

I had a similar need (without using the System.Messaging namespace) and solved it with some help from a book I read "Parallel Programming with Microsoft.NET" by Campbell and Johnson.

Check out their “parallel tasks” chapter and specifically the part of using a global queue that cooperates with per-thread local queues for work processing (i.e., the TPL) that use a “work stealing” algorithm to perform load balancing. I modeled my solution, in part, after their example. The final version of my system had a huge difference in its performance (from 23 messages per second to over 200).

Depending on how long it takes your system to go from 0 to the 80,000, you’ll want to take the same design and spread it across multiple servers (each with multiple processors and multiple cores). In theory my setup would require a little less than 7 minute to polish off all 80K, so by adding a 2nd computer it would cut that down to about ~3 minutes and 20 seconds, etc., etc., etc. The trick is the work stealing logic.

Food for thought …

A quick edit: BTW the computer is a Dell T7500 workstation with dual quad core Xeons @ 3GHz, 24 GB of RAM, Windows 7 Ultimate 64-bit edition.

Alligator answered 31/3, 2011 at 21:37 Comment(3)
thanks. I had thought of something like that, but I was trying to avoid having to roll my own implementation of work-stealing queues.Connel
For whatever it's worth, I had never put anything like that together before ... the book helped me allot and I had it up and running in just a couple of days. Considering I'm not the fastest coder I was happy with the results. The basic problem of having a bunch of work items (which are seperate - no dependencies between them) that have varying lifespans is a common problem for me so I've split this into a reusable component and use it in other places too.Alligator
As a follow-up, I'm researching this myself and haven't read it all, but I found that the book mentioned here is available on MSDN for free. Here's a direct link to the parallel tasks chapter: msdn.microsoft.com/en-us/library/ff963549.aspxGoto
C
6

Here is a simplified version of what I wound up doing:

while(true) {
    int msgCount = 0;

    Parallel.ForEach(Enumerable.Range(0,20), (i) => {
        MessageQueue queue = new MessageQueue(_queuePath);

        try {
            msg = queue.Receive(TimeSpan.Zero);
            // do work

            Interlocked.Increment(ref msgCount);
        catch(MessageQueueException mqex) {
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) {
                return; // nothing in queue
            }
            else throw;
        }           
    }

    if (msgCount < 20) 
        Thread.Sleep(1000); // nothing more to do, take a break
}

So I try to get messages 20 at a time, counting the ones I receive. For those 20, I let the TPL go to town. At the end, if I've processed less than 20 messages, the queue is empty and I sleep the thread for a second before trying again.

Connel answered 8/4, 2011 at 16:17 Comment(4)
Interesting. If I'm reading this correctly you are creating 20 queues to the same path, then synchronously reading from them. I'm curious if you hit any locking mechanism inside Microsoft's implementation that makes them thread safe (as it is a serial operation to receive or send on the queue)? It would also be interesting to profile this ... all the repeated instantiation of the queues (an expensive operation on 80K messages). Bottom line ... for every core on your computer you should see a "factor" drop in processing times (approximately). How did it do?Alligator
@Cirrus yes, I new up a MessageQueue object on each iteration because I do not believe that it would be safe to use a single object from multiple threads at the same time. I haven't profiled the cost of all those object creations, but an easy optimization is to create one per thread instead of one per iteration. Parallel.ForEach has an overload for that.Connel
I had the same question about MessageQueue threadsafety. From the following link "Only the following methods are safe for multithreaded operations: BeginPeek, BeginReceive, EndPeek, EndReceive, GetAllMessages, Peek, and Receive." So presumably you could have one MessageQueue instance referenced closure-style to your parallel operation. msdn.microsoft.com/en-us/library/…Idio
In .Net 4.0/4.5 document of MessageQueue class msdn.microsoft.com/en-us/library/… Microsoft indicates Only the GetAllMessages method is thread safe.. How how does your code address the issue?Ephesus
A
3

NServiceBus has a great concept for this very problem. It's called the Distributor. The idea is that the distributor can forward the work to be done and distribute it across any number of running child nodes. Depending upon the type of working being done, e.g. heavy computations vs disk writes, you could distribute that over multiple process or even multiple machines.

Anyplace answered 1/4, 2011 at 2:45 Comment(0)
M
2

The solution also depends partly on how the messages are being handled.

I used a WorkflowService hosted in Windows Server AppFabric with Net.Msmq binding and a transactional queue. The transactional net.msmq binding was needed to handle out of order message processing. The workflow is a .Net 4.0.1 state machine, and messages are coming onto the same queue from different systems. It is possible, for example, to have one system send an update to a state machine instance before another system has sent a message to instantiate it. To enable out of order message processing the workflow service host uses BufferedReceive to lock the messages, and repeatedly retries getting them from the lock subqueue. BufferedReceive has max pending messages set to the maximum likely batch length, because messages in the locked queue are returned to the retry queue at the front.

WF also has a number of throttling settings. My max likely batch length is about 20000. I have set the MaxConcurrentCalls to 64, and the MaxConcurrentInstances to 20000. This results in IIS/WAS handles 64 concurrent calls.

However, and this is the thing, because the Receives in the workflow are one-way, this does not means that the spawned WF processes terminate as soon as the Receive completes. What happens next in my scenario is that after the message is dequeued and a WF instance called, which is one of the 64 calls, a number of following steps are scheduled by the workflow engine and one of those is a database operation.

The gotcha is that 64 calls may be the maximum, but if the rate of message drain is higher than the rate of asynchronous process completion, as the incoming batch of messages are processed there will be a higher number of executing threads (in my case WF instances). This can cause unexpected things to happen, for example the ADO.NET connection pool has a default of 100 as maximum number of connections. This will cause processes to timeout waiting for connections from an exhausted pool. For this particular problem, you could either raise the MaxPoolSize value, or use Service Broker to deal with db operations asynchronously too (which means more complexity in the workflow).

Hope this helps someone.

Murcia answered 30/11, 2012 at 9:26 Comment(0)
D
2

Here's how I do it(changing the code on the fly so there might be spelling mistakes):

for (int i = 0; i < numberOfSimultaneousRequests; i++)
            priorityQueue.BeginReceive(TimeSpan.FromDays(30), state, callback);

and the callback looks something like this:

private void ProcessMessage(IAsyncResult asyncResult)
    {
        try
        {
            Message msg = priorityQueue.EndReceive(asyncResult);
            //Do something with the message
        }
        finally
        {
            priorityQueue.BeginDequeue(null, ProcessMessage);//start processing another one
        }
Diophantus answered 29/4, 2013 at 6:22 Comment(0)
M
1

Just trying somehow similar, tpl seems to be able to throw some sort of thread safety exception where it runs into physical issues, eg try create a sqlconnection outside of the tpl foreach and use it within the loop body - it threw an exception for me. I new up a queue before entering the body, enumerating over a list of strings and it seemed ok, my code was processing 10000 items consistantly under 500 ms using 1way messaging on a i7 2500 8gb and local msmq

Mouser answered 20/7, 2011 at 6:37 Comment(0)
R
1

I have found a complete solution on a blog called CodeRonin. It seems to me that this is the only complete example in the whole internet. Thank you, CodeRonin!

http://code-ronin.blogspot.de/2008/09/msmq-transactional-message-processing.html

Rancher answered 6/8, 2015 at 14:15 Comment(1)
Correct, if you're not using MSMQ in a transactional mode and you're using parallelism or distributed processing your failure modes likely involve losing or duplicating data. If you're using MSMQ and you're not concerned with failure modes, you probably don't need to be using MSMQ.Therewith
M
0

In this state, we don't need a waiting ring ( while (true){ //ToDo } ) Because this approach is costly. We need a receive event. This is a sample:

Create a MessageQueue instance and a list

private MessageQueue _queue= new MessageQueue("./private/queuename") 
{ 
    Formatter = new BinaryMessageFormatter() 
};

List<Message> _messages = new List<Message>();

Add a new function to Receiving event

 _queue.ReceiveCompleted += _queue_ReceiveCompleted;

 public void _queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
 { 
    e.Message.Formatter = new BinaryMessageFormatter();
    var message = e.Message.Body;
         _messages.Add(message);

    if (_messages.Count == 1000)
      {
         // Run a Task and create a new list of new messages for another task
         Task.Factory.StartNew(() => InsertInQueue(_messages.Tolist()));
        _messages = new List<Message>();
      }
}

 private static void InsertInQueue(IList<message> messages)
 {                       
   // TODO: process the messages
 }
Moraine answered 23/1, 2023 at 15:9 Comment(2)
Either this is a ChatGPT answer or the question was misunderstood. This answer doesn't answer the question at all and the code won't even compile. There's no Receiving event and ReceiveCompleted is only used to handle the completion of the operation started with BeginReceive. The question asks how to process messages in parallel, not how to retrieve messages asynchronouslyMicrobiology
You are right but after getting some messages run a task to process and create and repeat that.Moraine

© 2022 - 2024 — McMap. All rights reserved.