Stopping a service that reads MSMQ
Asked Answered
P

1

7

I'm a Java programmer who has been asked to make some changes to C# applications. I've been working with C# for a week now, and I've finally hit a point where looking at the documentation isn't helping and I can't find solutions when I google.

In this case I have a Windows Service that processes messages that arrive in a MSMQ. When a message is received the currently listening thread picks it up and goes off to do an operation that takes a couple of seconds.

public void Start()
{
    this.listen = true;
    for (int i = 0; i < Constants.ThreadMaxCount; i++)
    {
        ThreadPool.QueueUserWorkItem(new WaitCallback(this.StartListening), i);
    }
    ...

private void StartListening(Object threadContext)
{

    int threadId = (int)threadContext;
    threads[threadId] = Thread.CurrentThread;
    PostRequest postReq;
    while(this.listen)
    {
        System.Threading.Monitor.Enter(locker);
        try
        {

            postReq = GettingAMessage();
        }
        finally
        {
            System.Threading.Monitor.Exit(locker);
        }
    }
    ...
}

GettingAMessage() has the following lines that listen for a message:

Task<Message> ts = Task.Factory.FromAsync<Message>
    (queue.BeginReceive(), queue.EndReceive);
ts.Wait();

The problem is, when the Stop() method is called and there are no messages going into the MSMQ the threads all sit there waiting for a message. I have tried using timeouts, but that method doesn't seem elegant to me(and having switched over to the Task Factory, I'm not sure how to implement them currently). My solution to this was to add a reference of each thread to an array, so that I could cancel them. The following is called by each worker thread after being created.

threads[threadId] = Thread.CurrentThread;

and then supposed to be aborted by

public void Stop()
{
    try
    {
        this.listen = false;
        foreach(Thread a in threads) {
            a.Abort();
        }
    }
    catch
    {...}
}

Any advice on why this isn't shutting the threads down? (Or even better, can anyone tell me where I should look for how to cancel the ts.Wait() properly?)

Pursuit answered 10/11, 2014 at 16:48 Comment(2)
Never use Thread.Abort(). As to why it's not doing anything: Thread.Abort() can only abort a thread if it's executing managed code, which waiting on an MSMQ message is not.Reata
Without having delved into the specifics of MSMQ, I do have an idea that's worked for me in other messaging systems: you can send a message to yourself that says "stop processing now". This unblocks the receiver which then simply exits. Of course, this is only possible if you're free to define a new message type.Reata
S
5

Use the ManualResetEvent class to achieve a proper & graceful stopping of your running threads.

In addition, don't use the ThreadPool for long running threads, use your own created threads, otherwise, with lots of long-running tasks, you could end up with thread-pool starvation, possibly even leading to deadlock:

public class MsmqListener
{
    privatec ManualResetEvent _stopRequested = new ManualResetEvent(false);
    private List<Thread> _listenerThreads;
    private object _locker = new _locker();

    //-----------------------------------------------------------------------------------------------------

    public MsmqListener
    {
        CreateListenerThreads();
    }

    //-----------------------------------------------------------------------------------------------------

    public void Start()
    {
      StartListenerThreads();
    }

    //-----------------------------------------------------------------------------------------------------

    public void Stop()
    {
        try
        {
            _stopRequested.Set();
            foreach(Thread thread in _listenerThreads)
            {
                thread.Join(); // Wait for all threads to complete gracefully
            }
        }
        catch( Exception ex)
        {...}
    }

    //-----------------------------------------------------------------------------------------------------

    private void StartListening()
    {
            while( !_stopRequested.WaitOne(0) ) // Blocks the current thread for 0 ms until the current WaitHandle receives a signal
            {
                lock( _locker )
                {
                    postReq = GettingAMessage();
                }
            ...
    }

    //-----------------------------------------------------------------------------------------------------

    private void CreateListenerThreads()
    {
        _listenerThreads = new List<Thread>();
        for (int i = 0; i < Constants.ThreadMaxCount; i++)
        {
            listenerThread = new Thread(StartListening);
            listenerThreads.Add(listenerThread);
        }
    }

    //-----------------------------------------------------------------------------------------------------

    private void StartListenerThreads()
    {
        foreach(var thread in _listenerThreads)
        {
            thread.Start();
        }
    }
}

UPDATE: I changed the use of AutoResetEvent with ManualResetEvent in order to support the stopping of multiple waiting threads (Using ManualResetEvent, once you signaled, all waiting threads will be notified and be free to proceed theirs job - stop pooling for messages, in your case).

Using volatile bool does not provide all the guaranties. It may still read stale data. Better to use underlying OS synchronisation mechanism as it provides much stronger guaranties. Source: https://mcmap.net/q/858653/-autoresetevent-vs-boolean-to-stop-a-thread

Skepticism answered 10/11, 2014 at 16:59 Comment(7)
I can see how the AutoResetEvent would does with the while loop, but I can't see how it stops the ts.Wait() that blocks the shutdown. Am I missing something?Pursuit
There is no benefit to using .WaitOne(0) as opposed to just setting and checking a (volatile) boolean. And using AutoResetEvent in this case is certainly wrong, as multiple threads are executing the .WaitOne() -- you actually want ManualResetEvent if you want an event at all.Reata
@JeroenMostert C# volatile does not provide all the guaranties. It may still read stale data. Better to use underlying OS synchronisation mechanism as it provides much stronger guaranties.Skepticism
@YairNevet Unless you can provde a source for "it may still read stale data", I'm not inclined to believe that. Even so, if you don't like volatile, you can simply read and write the boolean inside the lock, which certainly does guarantee proper access.Reata
@JeroenMostert You're right regarding the ManualResetEvent and I updated the answer code, but yet I not agree about the volatile Boolean.Skepticism
@YairNevet Thanks for the source! Eric Lippert knows his stuff, so if he says it's not safe, it's safe to assume it's not. However, my point about the lock stands -- lock certainly does have the strong guarantees we want without creating an event. .Wait(0) is a smell because it shows you're only using a wait handle as a condition variable, and .NET already has support for that in the form of monitors (lock). It's OK for one event, but not for lots of events, since each of them is a kernel object.Reata
About the only thing this is missing is that in order to not sit on the BeginRecieve() until a message enters the queue is that you need a timeout or interrupt on that method. Thanks for the answer.Pursuit

© 2022 - 2024 — McMap. All rights reserved.