Awful MSMQ Performance Using ReceiveById
Asked Answered
C

3

9

Just 20 message per second! That is all I got! Here is the code that peeks 50 message from queue and receives them in parallel using ReceiveById. Total number of messages in queue is 500. I have tested other numbers too. But the top limit is 20 message per second! Am I somewhere totally out of the way?

Edit 1:

1 - I need the queue to be recoverable. BUT the interesting part is even if I set the recoverable option to false; still the top limit is 20 message/sec.

2 - I am forced to use MSMQ here because there are some legacy apps involved. But if this code is correct and this top 20 limit really exists I can persuade the group to switch. So any recommendation (based on actual experience) for replacing MSMQ is really welcome (And please note that we need to persist our messages in case of any failure of any kind).

3 - I have set the number of threads in ThreadPool to a high number in case it helps but actually in this code it will cause 100 - 200 threads be created. I have tested different number from 50 to 10000 and no difference made.

4 - In each task a new MessageQueue created because ReceiveById is not thread safe.

5 - As one can see in the code the message size is very low; it's just a string plus an int.

Edit 2: [Very Strange New Result]

I've played with every bit of this code and found this: if I comment out the line singleLocal.UseJournalQueue = false; in my task, I can read up to 1200 messages per second. Not impressive but acceptable in my case. The strange part is the default value of UseJournalQueue is false; why setting it to false again should make such difference in performance?

static partial class Program
{
    static void Main(string[] args)
    {
        ThreadPool.SetMaxThreads(15000, 30000);
        ThreadPool.SetMinThreads(10000, 20000);

        var qName = @".\private$\deep_den";

        if (!MessageQueue.Exists(qName))
        {
            var q = MessageQueue.Create(qName);
        }

        var single = new MessageQueue(qName);
        single.UseJournalQueue = false;
        single.DefaultPropertiesToSend.AttachSenderId = false;
        single.DefaultPropertiesToSend.Recoverable = true;
        single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

        var count = 500;
        var watch = new Stopwatch();

        watch.Start();
        for (int i = 0; i < count; i++)
        {
            var data = new Data { Name = string.Format("name_{0}", i), Value = i };

            single.Send(new Message(data));
        }
        watch.Stop();

        Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);

        var enu = single.GetMessageEnumerator2();

        watch.Reset();
        watch.Start();
        while (Interlocked.Read(ref __counter) < count)
        {
            var list = new List<Message>();
            var peekCount = 50;

            while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
            {
                try
                {
                    list.Add(enu.Current);
                    peekCount--;
                }
                catch (Exception ex2)
                {
                    Trace.WriteLine(ex2.ToString());
                    break;
                }
            }

            var tlist = new List<Task>();
            foreach (var message in list)
            {
                var stupid_closure = message;

                var t = new Task(() =>
                {
                    using (var singleLocal = new MessageQueue(qName))
                    {
                        singleLocal.UseJournalQueue = false;
                        singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
                        singleLocal.DefaultPropertiesToSend.Recoverable = true;
                        singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

                        try
                        {
                            // processing the message and insert it into database
                            // workflow completed here, so we can safely remove the message from queue

                            var localM = singleLocal.ReceiveById(stupid_closure.Id);
                            var localSample = (Data)localM.Body;

                            Interlocked.Increment(ref __counter);
                            Console.WriteLine(Interlocked.Read(ref __counter));
                        }
                        catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
                        catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
                    }
                }, TaskCreationOptions.PreferFairness);

                tlist.Add(t);
            }

            foreach (var t in tlist) t.Start();

            Task.WaitAll(tlist.ToArray());

            list.Clear();
        }
        watch.Stop();
        Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
        Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);

        Console.WriteLine("press any key to continue ...");
        Console.ReadKey();
    }
    static long __counter = 0;
}
Cosimo answered 5/1, 2013 at 0:10 Comment(5)
You're asking the ThreadPool for minimum of 10,000? That actually works? Seems like an extremely high number. Also, each task creates and uses its own MessageQueue object? Try running with just one thread as an experiment.Culmination
@Chris O Thanks for the comment. I have edited the question to reflect your points.Cosimo
Why do you peek and not just pull from a loop? Your problem is not the performance, you problem is the use of a bad approach to take messages out of the database. For "recoverable", this is what transactions have been created for.Regional
@Regional I do not know how to pull in MSMQ. But if you mean first peek a message the process it and then receive it in a transaction; I can not do that because the process part - in my project - takes ~50 ms (~20 message/second) and I have to process at least 100 messages/second.Cosimo
@KavehShahbazian If you do not need cardinality, then just receive in worker threads.Regional
H
1

Kaveh, The constructor of MessageQueue object that you are using sets the UseJournalQueue property to true in case the Message Queuing object's journal setting is enabled. Somehow it is thinking that .\private$\deep_den's journal setting is enabled. EDIT - Are you using a pre-created queue?

Hypersensitive answered 8/1, 2013 at 11:19 Comment(1)
The queue is created at line var q = MessageQueue.Create(qName); and I have checked that var single = new MessageQueue(qName); sets the default value of UseJournalQueue to false (.NET 4.0 VS 2012).Cosimo
D
1

When benchmarking its important to keep the code to a bare minimum in order to avoid background noise interfering with the test.

unfortunately your test is so noisy that it is quite hard to find what exactly is causing the delay

  • Dont use Threads. Multithreads are rarely helpful and usually cause more harm than good.
  • Test only one thing. When Testing ReceiveById dont use GetMessageEnumerator2 its costly and you will need to remove it from the results at the end.
  • Create MessageQueue only once and reuse it. we only test ReceiveById not the creation of new MessageQueue classes.

I have rewritten the test and receive much better results MSMQ is not the fastest queue on the block, but it is not slow.

    var qName = @".\private$\deep_den";

    if (!MessageQueue.Exists(qName))
    {
        var q = MessageQueue.Create(qName);
    }

    var single = new MessageQueue(qName);
    single.UseJournalQueue = true;
    single.DefaultPropertiesToSend.AttachSenderId = false;
    single.DefaultPropertiesToSend.Recoverable = true;
    single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

    var count = 500;
    var watch = new Stopwatch();

    watch.Start();
    for (int i = 0; i < count; i++)
    {
        var data = new Data { Name = string.Format("name_{0}", i), Value = i };

        single.Send(new Message(data));
    }
    watch.Stop();

    Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);

    var enu = single.GetMessageEnumerator2();

    watch.Reset();
    watch.Start();

    var queue = new MessageQueue(qName);
    queue.UseJournalQueue = true;
    queue.DefaultPropertiesToSend.AttachSenderId = false;
    queue.DefaultPropertiesToSend.Recoverable = true;
    queue.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });

    List<Data> lst = new List<Data>();
    while (lst.Count != count && enu.MoveNext(TimeSpan.FromDays(1)))
    {
        var message = queue.ReceiveById(enu.Current.Id);
        lst.Add((Data)message.Body);
    }
    watch.Stop();
    Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
    Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);

    Console.WriteLine("press any key to continue ...");
    Console.ReadKey();

Discrown answered 1/4, 2014 at 21:0 Comment(1)
Ronen's code runs pretty fast. (I've set UseJournalQueue to false). My results posted below (i5 notebook, regular disk, 8GB ram): sent 0.10515438 msec/message sent 9509.82736049606 message/sec rcvd 0.1163098 msec/message rcvd 8597.727792499 message/secRecuperate
M
0

Kaveh, I could be completely wrong here, but I think your problem is the XML serialization. Once an XmlSerializer is created, it can still be slow, but the constructor is what really takes time.

I would suggest either removing the serialization completely and reading the data as strings, or otherwise create a single XmlSerializer or XmlMessageFormatter before hand and passing it to the threads. I'd say be careful of the thread issues, but it looks like you have a good grasp on that.

Mellow answered 29/1, 2013 at 12:58 Comment(1)
I do not think so because after removing [q].UseJournalQueue = false; it works fine. I wonder why setting it to false cause this catastrophic drop in performance when the default value if false.Cosimo

© 2022 - 2024 — McMap. All rights reserved.