Service not receiving messages after Message Queuing service restarted
Asked Answered
A

1

9

We have a service that receives messages from n message queues. However, if the Message Queuing service is restarted, the message retrieval service stops receiving messages even after the Message Queuing service has restarted successfully.

I have tried to specifically catch the MessageQueueException that is thrown in the message retrieval service and invoke the queue's BeginReceive method again. However, in the 2 seconds or so that it takes the Message Queuing service to restart, I get about 1875 instances of the exception and then the service stops functioning when another MessageQueueException is thrown in our StartListening method.

Is there an elegant way to recover from a Message Queuing service restart?

    private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        MessageQueue queue = (MessageQueue)sender;

        try
        {
            Message message = queue.EndReceive(e.AsyncResult);

            this.StartListening(queue);

            if (this.MessageReceived != null)
                this.MessageReceived(this, new MessageReceivedEventArgs(message));
        }
        catch (MessageQueueException)
        {
            LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, StringResource.LogMessage_QueueManager_MessageQueueException, queue.MachineName, queue.QueueName, queue.Path));
            this.StartListening(queue);
        }            
    }

    public void StartListening(MessageQueue queue)
    {
        queue.BeginReceive();
    }

I need to deal with the infinite loop issue this causes and clean it up a bit but you get the idea.

When the MessageQueueException occurs, invoke the RecoverQueue method.

    private void RecoverQueue(MessageQueue queue)
    {            
        string queuePath      = queue.Path;
        bool   queueRecovered = false;

        while (!queueRecovered)
        {
            try
            {
                this.StopListening(queue);
                queue.Close();
                queue.Dispose();

                Thread.Sleep(2000);

                MessageQueue newQueue = this.CreateQueue(queuePath);

                newQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(this.OnReceiveCompleted);

                this.StartListening(newQueue);

                LogUtility.LogInformation(String.Format(CultureInfo.InvariantCulture, "Message queue {0} recovered successfully.", newQueue.QueueName));

                queueRecovered = true;
            }
            catch (Exception ex)
            {
                LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, "The following error occurred while trying to recover queue: {0} error: {1}", queue.QueueName, ex.Message));                
            }
        }           
    }

    public void StopListening(MessageQueue queue)
    {
        queue.ReceiveCompleted -= new ReceiveCompletedEventHandler(this.OnReceiveCompleted);            
    }
Alage answered 26/4, 2012 at 15:31 Comment(1)
Posted the RecoverQueue method codeAlage
C
8

Upon receiving the exception that is the result of the service restarting, you have to release the old MessageQueue, i.e. unwiring your ReceiveCompleted event, disposing the MessageQueue, etc. Then create a new instance of the MessageQueue and hook up to the ReceiveCompleted event again on the new MessageQueue instance.

Alternatively, you can use a polling method that creates a new instance on a certain interval, calls MessageQueue.Receive(TimeSpan), will wait for an incoming message or until the timeout occurs. In which case you handle the message and destroy the MessageQueue instance and start the iteration again.

By recreating the MessageQueue each time, you ensure a built in recovery. Also, the overhead of creating the MessageQueue is minimal due to internal caching of the underlying queue.

Pseudo-code...

while (!notDone)// or use a timer or periodic task of some sort...
{
    try
    {
        using (MessageQueue queue = new MessageQueue(queuePath))
        {
            Message message = queue.Receive(TimeSpan.FromMilliseconds(500));

            // process message
        }
    }
    catch (MessageQueueException ex)
    {
        // handle exceptions
    }
}
Corabelle answered 26/4, 2012 at 18:24 Comment(3)
Okay, so you're just basically saying: "Use a new queue every time."Manche
@Bob Horn - Yes. Because the overhead is low due to the internal caching, it makes it easier to handle the issues where the MSMQ service was restarted or otherwise becomes unresponsive.Corabelle
@Jim- I declare the queue when the service first starts and then set a handler for OnReceived. I have experienced the same issue. The handler gets triggered, encounters a QueueException when doing an EndReceive and reading the message, but that stops the service from ever receiving future messages. How do I handle this exception when I am not using a timer as you described?Banausic

© 2022 - 2024 — McMap. All rights reserved.