Basic design pattern for using TPL inside windows service for C#
Asked Answered
T

1

9

I am trying to build up windows service which need some kind of parallelism for pooling files from different ftp sources. For starting multiple ftp downloads I am looking in TPL library to easily do foreach loops and making parallelism totally easy. But when I search how to start or stop my service(s), best shot I fund is to make new threads inside OnStart() method, as it is described here https://mcmap.net/q/245531/-windows-service-to-run-constantly

Reading about TPL there is always note that TPL is more advanced than manual threading and manual stop threading.

I did not find any sample post which describes how to make TPL loop inside WindowsService ?

My code:

protected override void OnStart(string[] args)
{
     _thread = new Thread(WorkerThreadFunc);
     _thread.Name = "My Worker Thread";
     _thread.IsBackground = true;
     _thread.Start();
}

And inside WorkerThreadFunc to do sme kind of TPL

private void WorkerThreadFunc()
{
foreach (string path in paths)
{
    string pathCopy = path;
    var task = Task.Factory.StartNew(() =>
        {
            Boolean taskResult = ProcessPicture(pathCopy);
            return taskResult;
        });
    task.ContinueWith(t => result &= t.Result);
    tasks.Add(task);
}

}

Or I should start my WorkerThreadFunc also as TASK ?

Thom answered 9/12, 2014 at 16:33 Comment(0)
E
28

Caveat here is not only how you start your worker, but also how you actually stop it.

Beside Task itself, TPL also provides you with a very convenient way of task cancellation by using CancellationToken and CancellationTokenSource objects.

Starting and stopping windows service with TPL

To apply this technique to your Windows Service, you basically need to do the following:

    private CancellationTokenSource tokenSource;
    private Task backgroundTask;

    protected override void OnStart(string[] args)
    {
        tokenSource = new CancellationTokenSource();
        var cancellation = tokenSource.Token;
        backgroundTask = Task.Factory.StartNew(() => WorkerThreadFunc(cancellation),
                                    cancellation,
                                    TaskCreationOptions.LongRunning,
                                    TaskScheduler.Default);
    }

Notes:

  • TaskCreationOptions.LongRunning hints a task scheduler that this task may require a separate thread and avoids putting it on a ThreadPool (so that it won't block other items in a pool)
  • CancellationToken is passed to WorkerThreadFunc because this is the way it will be notified to stop its work - see Cancellation section below

And in your OnStop method, you trigger cancellation and wait for task to complete:

    protected override void OnStop()
    {
        bool finishedSuccessfully = false;
        try
        {
            tokenSource.Cancel();
            var timeout = TimeSpan.FromSeconds(3);
            finishedSuccessfully = backgroundTask.Wait(timeout);
        }
        finally
        {
            if (finishedSuccessfully == false)
            {
                // Task didn't complete during reasonable amount of time
                // Fix your cancellation handling in WorkerThreadFunc or ProcessPicture
            }
        }
    }

Cancellation

By calling tokenSource.Cancel(); we simply tell every CancellationToken issued by this tokenSource to become cancelled and every method that accepted such token (like your WorkerThreadFunc) should now be stopping its work.

Handling cancellation is specific to the implementation, but general rule is that your method should be monitoring cancellation token state and be able to stop its work in a reasonable amount of time. This approach requires you to logically split your work into smaller parts so that you won't stuck at doing some work that needs a lot of time to complete and won't start any new work if cancellation was requested.

Looking at your WorkerThreadFunc code, you may consider to check for cancellation before you execute every new ProcessPicture task, for example:

private List<Task> tasks = new List<Task>();

private void WorkerThreadFunc(CancellationToken token)
{
    foreach (string path in paths)
    {
        if (token.IsCancellationRequested)
        {
            // you may also want to pass a timeout value here to handle 'stuck' processing
            Task.WaitAll(tasks.ToArray());

            // no more new tasks
            break;
        }

        string pathCopy = path;
        var task = Task.Factory.StartNew(() =>
            {
                Boolean taskResult = ProcessPicture(pathCopy, token); // <-- consider a cancellation here
                return taskResult;
            }, token); // <<--- using overload with cancellation token
        task.ContinueWith(t => result &= t.Result);
        tasks.Add(task);
    }
}

If ProcessPicture takes very long time to complete, you may also want to add cancellation support in there. Similarly to WorkerThreadFunc, you should take into account ProcessPicture implementation. The key idea here is to find a place where you can safely stop doing work and return from method. By safely I mean - without leaving system or data in a broken state.

In addition to monitoring IsCancellationRequested in WorkerThreadFunc, you can also Register a callback that will be executed when cancellation is requested, to do some other stuff like cleanup, etc:

token.Register(CancellationCallback);

And

private void CancellationCallback()
{
    // wait for all tasks to finish

    // cleanup
}
Erepsin answered 9/12, 2014 at 17:12 Comment(8)
+1000 thanx a lot, One more thing, Shoud I use same CancellationToken for child tasks to safe stop procesign picures ? Shoud OnStop() method do backgroundTask.Wiat(3000); ?Thom
Yes, usually it is safe to re-use the same cancellation token for child tasks (unless you need some different cancellation behavior for them). For OnStop - yes, I meant instance Wait method. Updated code for clarity.Erepsin
Sorry, what is a tasks object in the Task.WaitAll(tasks);? And how would I pass a timeout value here?Aletheaalethia
Also, what does task.ContinueWith(t => result &= t.Result); do? Does it simply accumulate whether all ProcessPicture tasks have been successful? And lastly, what is the purpose of Task.Factory.StartNew inside a foreach loop (in WorkerThreadFunc)? Why not just use Parallel.ForEach straight away?Aletheaalethia
The object you pass into WaitAll should be Task[], tasks property can be of type List<Task>. To pass a timeout, use this overload.Erepsin
Code in ContinueWith here defines something that should be done with the result after task has completed its execution. I took that code from the original question just to show complete example, your implementation may not need this.Erepsin
Depending on which parts of this code are copied and pasted, there is a potentially nasty bug here. Task.Factory.StartNew does not return the Task from WorkerThreadFunc, and in fact does not support async delegates at all. This means that backgroundTask will have "completed" after the first use of await inside WorkerThreadFunc. (Contrary to most people's expectations). The solution is to use the Unwrap() extension method on the result of Task.Factory.StartNew, which lifts out the Task you're really interested in.Addictive
Two additional warnings: 1) I've found that OnStop is never called if the backgroundThread exits, and consequently added backgroundThread.ContinueWIth that calls base.Stop(). 2)It's debatable that "LongRunning" should be used here: blog.stephencleary.com/2013/08/startnew-is-dangerous.htmlDele

© 2022 - 2024 — McMap. All rights reserved.