Just 20 message per second! That is all I got! Here is the code that peeks 50 message from queue and receives them in parallel using ReceiveById. Total number of messages in queue is 500. I have tested other numbers too. But the top limit is 20 message per second! Am I somewhere totally out of the way?
Edit 1:
1 - I need the queue to be recoverable. BUT the interesting part is even if I set the recoverable option to false; still the top limit is 20 message/sec.
2 - I am forced to use MSMQ here because there are some legacy apps involved. But if this code is correct and this top 20 limit really exists I can persuade the group to switch. So any recommendation (based on actual experience) for replacing MSMQ is really welcome (And please note that we need to persist our messages in case of any failure of any kind).
3 - I have set the number of threads in ThreadPool to a high number in case it helps but actually in this code it will cause 100 - 200 threads be created. I have tested different number from 50 to 10000 and no difference made.
4 - In each task a new MessageQueue created because ReceiveById is not thread safe.
5 - As one can see in the code the message size is very low; it's just a string plus an int.
Edit 2: [Very Strange New Result]
I've played with every bit of this code and found this: if I comment out the line singleLocal.UseJournalQueue = false; in my task, I can read up to 1200 messages per second. Not impressive but acceptable in my case. The strange part is the default value of UseJournalQueue is false; why setting it to false again should make such difference in performance?
static partial class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
{
var q = MessageQueue.Create(qName);
}
var single = new MessageQueue(qName);
single.UseJournalQueue = false;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
{
var data = new Data { Name = string.Format("name_{0}", i), Value = i };
single.Send(new Message(data));
}
watch.Stop();
Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
while (Interlocked.Read(ref __counter) < count)
{
var list = new List<Message>();
var peekCount = 50;
while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
{
try
{
list.Add(enu.Current);
peekCount--;
}
catch (Exception ex2)
{
Trace.WriteLine(ex2.ToString());
break;
}
}
var tlist = new List<Task>();
foreach (var message in list)
{
var stupid_closure = message;
var t = new Task(() =>
{
using (var singleLocal = new MessageQueue(qName))
{
singleLocal.UseJournalQueue = false;
singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
singleLocal.DefaultPropertiesToSend.Recoverable = true;
singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
try
{
// processing the message and insert it into database
// workflow completed here, so we can safely remove the message from queue
var localM = singleLocal.ReceiveById(stupid_closure.Id);
var localSample = (Data)localM.Body;
Interlocked.Increment(ref __counter);
Console.WriteLine(Interlocked.Read(ref __counter));
}
catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
}
}, TaskCreationOptions.PreferFairness);
tlist.Add(t);
}
foreach (var t in tlist) t.Start();
Task.WaitAll(tlist.ToArray());
list.Clear();
}
watch.Stop();
Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
}
static long __counter = 0;
}