Using QueueClient.OnMessage in an azure worker role
Asked Answered
K

1

19

I have an Azure worker role that is responsible for checking 4 service bus queues. Currently, I just the looping method to manually check the queues.

while(true)
{
    //loop through my queues to check for messages
}

With the Azure SDK 2.0 came the ability to listen for messages rather than polling for them. But Every example I've seen uses a console app with Console.ReadKey(). Is there a way to have the worker role sit and wait on messages too?

I tried:

public override void Run()
{
    _queueProcessors.ForEach(x => x.OnMessage(Process);
}

where _queueProcessors is a list of QueueClients and Process is a private method that handles the messages. However, the worker role would register them and then restart.

So anyone know how to make a queue client sit and wait on a message?

Katushka answered 23/5, 2013 at 13:13 Comment(0)
P
39

Following is a code sample for this:

using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;
using System.Diagnostics;
using System.Net;
using System.Threading;

namespace WorkerRoleWithSBQueue1
{
    public class WorkerRole : RoleEntryPoint
    {
        // The name of your queue
        const string QueueName = "demoapp";
        ManualResetEvent CompletedEvent = new ManualResetEvent(false);

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        QueueClient Client;

        public override void Run()
        {
            OnMessageOptions options = new OnMessageOptions();
            options.AutoComplete = true; // Indicates if the message-pump should call complete on messages after the callback has completed processing.
            options.MaxConcurrentCalls = 1; // Indicates the maximum number of concurrent calls to the callback the pump should initiate 
            options.ExceptionReceived += LogErrors; // Allows users to get notified of any errors encountered by the message pump

            Trace.WriteLine("Starting processing of messages");
            // Start receiveing messages
            Client.OnMessage((receivedMessage) => // Initiates the message pump and callback is invoked for each message that is recieved, calling close on the client will stop the pump.
                {
                    try
                    {
                        // Process the message
                        Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
                    }
                    catch
                    {
                        // Handle any message processing specific exceptions here
                    }
                }, options);

            CompletedEvent.WaitOne();
        }

        private void LogErrors(object sender, ExceptionReceivedEventArgs e)
        {
            if (e.Exception != null)
            {
                Trace.WriteLine("Error: " + e.Exception.Message);
            }
        }

        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Create the queue if it does not exist already
            Trace.WriteLine("Creating Queue");
            string connectionString = "*** provide your connection string here***";
            var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(QueueName))
            {
                namespaceManager.CreateQueue(QueueName);
            }

            // Initialize the connection to Service Bus Queue
            Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
        
            Trace.WriteLine("Sending messages...");
            // populate some messages
            for (int ctr = 0; ctr < 10; ctr++)
            {
                Client.Send(new BrokeredMessage());
            }
        
            return base.OnStart();
        }

        public override void OnStop()
        {
            // Close the connection to Service Bus Queue
            Client.Close();
            CompletedEvent.Set(); // complete the Run function
            base.OnStop();
        }
    }
}
Paramilitary answered 24/5, 2013 at 16:32 Comment(7)
mccow002 You have to give @abhishek a little credit. He is a PM on the Windows Azure team. He may very well have written the sample before OnMessage was public. :)Highmuckamuck
Very nice, a side benefit of this approach is that you can break out of the OnMessage whenever you want. I was looking for some form of state.Break (similar to Parallel.For), but this works just fine. Thanks.Meter
So, what happens if the OnMessage() handler is processing a message when the OnStop() method is invoked? I guess the processing is simply terminated without giving it a chance to complete?Hunger
That is correct, the client connection will close and any subsequent message.complete operations will not happen so the message will be returned to the queue and available to the next receiver/processor.Paramilitary
With AutoComplete = true, will an explicit receivedMessage.Abandon() call (ie, in the catch(){ }) actually abandon the message, or will the auto-complete setting override that?Pammy
I'm curious to know the same, @PammyAnkerite
I believe it will abandon it - but you will subsequently get an Exception thrown into OnExceptionReceived when the AutoComplete tries to call .Complete() (i might be wrong though - i observed this behaviour when i had autocomplete on but had left in .Complete() manual calls)Albinaalbinism

© 2022 - 2024 — McMap. All rights reserved.