MSMQ and polling to receive messages?
Asked Answered
S

5

8

I've got a windows service that does some image conversion. It works by firing off when any file (in a particular folder) is renamed (ie. rename file watcher). Works great until I have a massive amount of images dumped (and renamed) in that folder. CPU redlines, etc..

So, I was going to change my code to use MSMQ to queue all the files that need to be converted. Fine. Everytime the file is renamed and the file watcher fires, i then add a new message to the queue. Kewl.

Problem is this -> how do i grab one message at a time from the queue?

Do I need to make a timer object that polls the queue every xxx seconds? Or is there a way to constantly keep peeking the first item in the queue. Once a message exists, extract it, process it, then continue (which .. means, keep peeking until the world blows up).

I've wondered if i just need to put a while loop around the Receive method. Pseduo code is below (in Edit #2)...

Anyone have any experience with this and have some suggestions?

Thanks kindly!

EDIT:

If WCF is the way to go, can someone provide some sample code, etc instead?

EDIT 2:

Here's some pseudo code i was thinking off....

// Windows service start method.
protected override void OnStart(string[] args)
{
   // some initialisation stuf...

   // Start polling the queue.
   StartPollingMSMQ();

   // ....
}

private static void StartPollingMSMQ()
{
    // NOTE: This code should check if the queue exists, instead of just assuming it does.
    //       Left out for berevity.
    MessageQueue messageQueue = new MessageQueue(".\\Foo");

    while (true)
    {
        // This blocks/hangs here until a message is received.
        Message message = messageQueue.Receive(new TimeSpan(0, 0, 1));

        // Woot! we have something.. now process it...
        DoStuffWithMessage(message);

        // Now repeat for eva and eva and boomski...
    }
}
Sacred answered 29/7, 2009 at 5:27 Comment(0)
B
26

If you use local queue, you don't need WCF.

This is how looks my sample service (a service clas from windows service project):

using System.Messaging;
public partial class MQProcessTest1 : ServiceBase
{
    //a name of the queue
    private const string MqName = @".\Private$\test1";
    //define static local private queue
    private static MessageQueue _mq;
    //lazy local property through which we access queue (it initializes queue when needed)
    private static MessageQueue mq
    {
        get
        {
            if (_mq == null)
            {
                if (!MessageQueue.Exists(MqName))
                    MessageQueue.Create(MqName);
                _mq = new MessageQueue(MqName, QueueAccessMode.ReceiveAndAdmin);
                _mq.Formatter = new BinaryMessageFormatter();
            }
            return _mq;
        }
    }

    //constructor
    public MQProcessTest1()
    {
        InitializeComponent();
        //event to process received message 
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
    }

    //method to process message
    private void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //queue that have received a message
        MessageQueue cmq = (MessageQueue)sender;
        try
        {
            //a message we have received (it is already removed from queue)
            Message msg = cmq.EndReceive(e.AsyncResult);
            //here you can process a message
        }
        catch
        {
        }
        //refresh queue just in case any changes occurred (optional)
        cmq.Refresh();
        //tell MessageQueue to receive next message when it arrives
        cmq.BeginReceive();
    }

    protected override void OnStart(string[] args)
    {
        //let start receive a message (when arrives)
        if (mq != null)
            mq.BeginReceive();
        //you can do any additional logic if mq == null
    }

    protected override void OnStop()
    {
        //close MessageQueue on service stop
        if (mq != null)
            mq.Close();
        return;
    }
}
Burble answered 29/7, 2009 at 6:41 Comment(2)
This is a good solution, but I don't think it works for transactional queues (as there's no option to specify a transaction in BeginReceive or EndReceive)Crater
Years passed but... MessageQueue.Create(MqName) - avoid it - you can get problem accessing queue from other apps, and manage it manually. Create queue in some other way. PowerShell script, manually via windows UI etc, with specific permissions and so on.Obrien
S
1

I was under the impression that MSMQ was built to be compatible with IBM's MQ product. If that is the case, you can call MQGET with a timeout and not worry about polling at all.

Just get a message off the queue with a two-second timeout (for example). If there was one there, process it. Then either exit the service if desired or go back to the MQGET with wait.

This means your service will not suck up CPU time unnecessarily, but it will still be able to exit in a timely fashion if signaled.

Generally, you would have something like:

Set up all queue stuff.
while true:
    Read from queue with 10-second timeout.
    If message was read:
        Process message
    If signaled to exit:
        break
Tear down queue stuff.
Exit.
Sanjuanitasank answered 29/7, 2009 at 6:16 Comment(6)
Hi Pax, i just updated my initial post while you were answering this question. Can u please re-read and see if that changes anything?Sacred
No, not really. It's a good approach, assuming that timespan is one minute (i.e., not too small). You want it large enough that your loops not running continuously but small enought that you can react to non-MSMQ stuff. Part of that "stuff" would be exiting that while-true loop if your service has been told to shut down.Sanjuanitasank
And you only want to call DoStuffWithMessage if a message was returned, not if the timeout happened without a message.Sanjuanitasank
but with the timeout, that's not going to keep reading it again. .. right?Sacred
@PureKrome: No, if you read a message, it comes off the queue and you process it. Then, when you go back to the queue, you'll get the next message. What you're talking about is peeking.Sanjuanitasank
With IBM MQ, you would start a transaction, then MQGET, then assuming you process it successfully, MQCOMMIT (or MQROLLBACK if unsuccessful). Whether equivalent things exist in MSMQ I don't know for sure.Sanjuanitasank
C
1

cast the event args to get your message rather than sender to a new MessageQueue in ViktorJ example, surely this is more efficient? then use static field mq to call mq.BeginReceive otherwise your'e burning memory

Cyanine answered 24/2, 2012 at 22:0 Comment(0)
C
0

Sounds like you need to look into WCF.

Queues in Windows Communication Foundation

Load leveling. Sending applications can overwhelm receiving applications with messages. Queues can manage mismatched message production and consumption rates so that a receiver is not overwhelmed.

Here's an example using WCF and MSMQ

Cq answered 29/7, 2009 at 5:33 Comment(2)
WCF uses MSMQ behind the scenes ... so i'm not sure how WCF helps me here.Sacred
Please have a look at the example in the link I added to my answer.Cq
U
0

Be aware that the service example will block at OnStart(). Instead start a worker thread :

    protected override void OnStart(string[] args)
    {
        IntPtr handle = this.ServiceHandle;
        myServiceStatus.currentState = (int)State.SERVICE_START_PENDING;
        SetServiceStatus(handle, ref myServiceStatus);

        // Start a separate thread that does the actual work.

        if ((workerThread == null) ||
            ((workerThread.ThreadState &
             (System.Threading.ThreadState.Unstarted | System.Threading.ThreadState.Stopped)) != 0))
        {
            workerThread = new Thread(new ThreadStart(ServiceWorkerMethod));
            workerThread.Start();
        }

    }

then call BeginReceive() from the worker.

Unarm answered 13/11, 2009 at 20:54 Comment(2)
Er - I don't get it. So, this is a windows service. Check. It has an over-riding method called OnStart(..). Check. I override it. Check. But now, you're suggesting that something gets blocked so i need to do some of this new stuff in a newer thread.Sacred
I think what he's trying to say is what is said here.Doings

© 2022 - 2024 — McMap. All rights reserved.