Multi threaded Windows service to process Windows Message Queue
Asked Answered
A

2

1

This is my first attempt at writing a Windows service.

This windows service has to process 2 windows message Queues.

Each Message Queue should have there own thread, but I can't seem to get the Architecture in Place.

I followed this Windows Service to run constantly which allowed me to create one thread in which I am Processing one Queue.

So this is my service class:

    protected override void OnStart(string[] args)
    {
        _thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true };
        _thread.Start();
    }

    private void WorkerThreadFunc()
    {
        _addressCalculator = new GACAddressCalculator();

        while (!_shutdownEvent.WaitOne(0))
        {
            _addressCalculator.StartAddressCalculation();
        }
    }



    protected override void OnStop()
    {
        _shutdownEvent.Set();
        if (!_thread.Join(5000))
        { // give the thread 5 seconds to stop
            _thread.Abort();
        }
    }

In My GACAddressCalculator.StartAddressCalculation() I am creating a Queue Processor Object which looks like this:

    public void StartAddressCalculation()
    {
        try
        {
            var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1);
            googleQueue.ProccessMessageQueue();

        }
        catch (Exception ex)
        {

        }

    }

And this is GISGoogleQueue:

public class GISGoogleQueue : BaseMessageQueue
{


    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
        : base(queueName, threadCount, logger, messagesPerThread)
    {
    }

    public override void ProccessMessageQueue()
    {
        if (!MessageQueue.Exists(base.QueueName))
        {
            _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
            return;
        }

        var messageQueue = new MessageQueue(QueueName);
        var myVehMonLog = new VehMonLog();
        var o = new Object();
        var arrTypes = new Type[2];
        arrTypes[0] = myVehMonLog.GetType();
        arrTypes[1] = o.GetType();
        messageQueue.Formatter = new XmlMessageFormatter(arrTypes);

        using (var pool = new Pool(ThreadCount))
        {

            // Infinite loop to process all messages in Queue
            for (; ; )
            {
                for (var i = 0; i < MessagesPerThread; i++)
                {
                    try
                    {
                        while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed


                        var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins

                        if (message != null) // Check if message is not Null
                        {
                            var monLog = (VehMonLog)message.Body;
                            pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
                        }

                    }
                    catch (Exception ex)
                    {

                    }

                }
            }
        }
    }

}

Now this works fine for 1 Message queue, but if I want to process another message Queue it won't happen as I have an infinite loop in the ProccessMessageQueue method.

I want to execute each Queue in a separate thread.

I think I am making a mistake in WorkerThreadFunc(), I have to somehow start two threads from there or in the OnStart().

Also if you have any tips on how to improve this service would be great.

By the way I am using the Pool Class from this Answer https://mcmap.net/q/245639/-code-for-a-simple-thread-pool-in-c-closed for the thread Pool inside ProccessMessageQueue

Aspidistra answered 14/11, 2015 at 6:15 Comment(0)
S
3

I would suggest changing your service class as follows (comments below):

protected override void OnStart(string[] args)
{
    _thread = new Thread(WorkerThreadFunc)
              {
                  Name = "Run Constantly Thread",
                  IsBackground = true
              };
    _thread.Start();
}

GISGoogleQueue _googleQueue1;
GISGoogleQueue _googleQueue2;
private void WorkerThreadFunc()
{
    // This thread is exclusively used to keep the service running.
    // As such, there's no real need for a while loop here.  Create
    // the necessary objects, start them, wait for shutdown, and
    // cleanup.
    _googleQueue1 = new GISGoogleQueue(...);
    _googleQueue1.Start();
    _googleQueue2 = new GISGoogleQueue(...);
    _googleQueue2.Start();

    _shutdownEvent.WaitOne();  // infinite wait

    _googleQueue1.Shutdown();
    _googleQueue2.Shutdown();
}

protected override void OnStop()
{
    _shutdownEvent.Set();
    if (!_thread.Join(5000))
    {
        // give the thread 5 seconds to stop
        _thread.Abort();
    }
}

I'm ignoring your GACAddressCalculator. From what you showed, it appeared to be a thin wrapper around GISGoogleQueue. Obviously, if it actually does something that you didn't show, it'll need to be factored back in.

Notice that two GISGoogleQueue objects were created in the WorkerThreadFunc(). So let's next look at how to create those objects to achieve the appropriate threading model.

public class GISGoogleQueue : BaseMessageQueue
{
    System.Threading.Thread _thread;
    System.Threading.ManualResetEvent _shutdownEvent;

    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
         : base(queueName, threadCount, logger, messagesPerThread)
    {
        // Let this class wrap a thread object.  Create it here.
        _thread = new Thread(RunMessageQueueFunc()
                  {
                      Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(),
                      IsBackground = true
                  };
        _shutdownEvent = new ManualResetEvent(false);
    }

    public Start()
    {
        _thread.Start();
    }

    public Shutdown()
    {
        _shutdownEvent.Set();
        if (!_thread.Join(5000))
        {
            // give the thread 5 seconds to stop
            _thread.Abort();
        }
    }

    private void RunMessageQueueFunc()
    {
        if (!MessageQueue.Exists(base.QueueName))
        {
            _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
            return;
        }

        var messageQueue = new MessageQueue(QueueName);
        var myVehMonLog = new VehMonLog();
        var o = new Object();
        var arrTypes = new Type[2];
        arrTypes[0] = myVehMonLog.GetType();
        arrTypes[1] = o.GetType();
        messageQueue.Formatter = new XmlMessageFormatter(arrTypes);

        using (var pool = new Pool(ThreadCount))
        {
            // Here's where we'll wait for the shutdown event to occur.
            while (!_shutdownEvent.WaitOne(0))
            {
                for (var i = 0; i < MessagesPerThread; i++)
                {
                    try
                    {
                        // Stop execution until Tasks in pool have been executed
                        while (pool.TaskCount() >= MessagesPerThread) ;

                        // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
                        var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0));

                        if (message != null) // Check if message is not Null
                        {
                            var monLog = (VehMonLog)message.Body;
                            pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
                        }
                    }
                    catch (Exception ex)
                    {
                    }
                }
            }
        }
    }
}

This approach centers around using a Thread object wrapped by the GISGoogleQueue class. For each GISGoogleQueue object you create, you get a wrapped thread that will do the work once Start() is called on the GISGoogleQueue object.

A couple of points. In the RunMessageQueueFunc(), you're checking to see if the name of the queue exists. If it doesn't, the function exits. IF that happens, the thread exits, too. The point is that you may wish to do that check earlier in the process. Just a thought.

Second, note that your infinite loop has been replaced by a check against the _shutdownEvent object. That way, the loop will stop when the service shuts down. For timeliness, you'll need to make sure that a complete pass through the loop doesn't take too long. Otherwise, you may end up aborting the thread 5 seconds after shutdown. The abort is only there to make sure things are torn down, but should be avoided if possible.

I know a lot of people will prefer using the Task class to do things like this. It appears that you are inside RunMessageQueueFunc(). But for threads that run for the duration of the process, I think the Task class is the wrong choice because it ties up threads in the thread pool. To me, that what the Thread class is build for.

HTH

Saliferous answered 14/11, 2015 at 17:44 Comment(1)
Thanks for the great answer. I think you have a typo in RunMessageQueueFunc(): while (!_shutdownEvent.Wait(0)) I think it should be: !_shutdownEvent.WaitOne(0)Aspidistra
K
0

You can use Parallel.ForEach like this;

 Parallel.ForEach(queueItems, ProcessQueue); //this will process each queue item in a separate thread


 private void ProcessQueue(QueueItem queue)
 {
     //your processing logic       
 }
Kazmirci answered 14/11, 2015 at 7:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.