A pattern for self-cancelling and restarting task
Asked Answered
E

4

23

Is there a recommended established pattern for self-cancelling and restarting tasks?

E.g., I'm working on the API for background spellchecker. The spellcheck session is wrapped as Task. Every new session should cancel the previous one and wait for its termination (to properly re-use the resources like spellcheck service provider, etc).

I've come up with something like this:

class Spellchecker
{
    Task pendingTask = null; // pending session
    CancellationTokenSource cts = null; // CTS for pending session

    // SpellcheckAsync is called by the client app
    public async Task<bool> SpellcheckAsync(CancellationToken token)
    {
        // SpellcheckAsync can be re-entered
        var previousCts = this.cts;
        var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
        this.cts = newCts;

        if (IsPendingSession())
        {
            // cancel the previous session and wait for its termination
            if (!previousCts.IsCancellationRequested)
                previousCts.Cancel();
            // this is not expected to throw
            // as the task is wrapped with ContinueWith
            await this.pendingTask; 
        }

        newCts.Token.ThrowIfCancellationRequested();
        var newTask = SpellcheckAsyncHelper(newCts.Token);

        this.pendingTask = newTask.ContinueWith((t) => {
            this.pendingTask = null;
            // we don't need to know the result here, just log the status
            Debug.Print(((object)t.Exception ?? (object)t.Status).ToString());
        }, TaskContinuationOptions.ExecuteSynchronously);

        return await newTask;
    }

    // the actual task logic
    async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
    {
        // do not start a new session if the the previous one still pending
        if (IsPendingSession())
            throw new ApplicationException("Cancel the previous session first.");

        // do the work (pretty much IO-bound)
        try
        {
            bool doMore = true;
            while (doMore)
            {
                token.ThrowIfCancellationRequested();
                await Task.Delay(500); // placeholder to call the provider
            }
            return doMore;
        }
        finally
        {
            // clean-up the resources
        }
    }

    public bool IsPendingSession()
    {
        return this.pendingTask != null &&
            !this.pendingTask.IsCompleted &&
            !this.pendingTask.IsCanceled &&
            !this.pendingTask.IsFaulted;
    }
}

The client app (the UI) should just be able to call SpellcheckAsync as many times as desired, without worrying about cancelling a pending session. The main doMore loop runs on the UI thread (as it involves the UI, while all spellcheck service provider calls are IO-bound).

I feel a bit uncomfortable about the fact that I had to split the API into two peices, SpellcheckAsync and SpellcheckAsyncHelper, but I can't think of a better way of doing this, and it's yet to be tested.

Electrum answered 25/9, 2013 at 8:26 Comment(0)
E
25

I think the general concept is pretty good, though I recommend you not use ContinueWith.

I'd just write it using regular await, and a lot of the "am I already running" logic is not necessary:

Task pendingTask = null; // pending session
CancellationTokenSource cts = null; // CTS for pending session

// SpellcheckAsync is called by the client app on the UI thread
public async Task<bool> SpellcheckAsync(CancellationToken token)
{
    // SpellcheckAsync can be re-entered
    var previousCts = this.cts;
    var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
    this.cts = newCts;

    if (previousCts != null)
    {
        // cancel the previous session and wait for its termination
        previousCts.Cancel();
        try { await this.pendingTask; } catch { }
    }

    newCts.Token.ThrowIfCancellationRequested();
    this.pendingTask = SpellcheckAsyncHelper(newCts.Token);
    return await this.pendingTask;
}

// the actual task logic
async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
{
    // do the work (pretty much IO-bound)
    using (...)
    {
        bool doMore = true;
        while (doMore)
        {
            token.ThrowIfCancellationRequested();
            await Task.Delay(500); // placeholder to call the provider
        }
        return doMore;
    }
}
Ebro answered 25/9, 2013 at 12:34 Comment(10)
@Stephen Cleary, I have immense respect for your work on all things async, so please don't take this the wrong way: I am merely being curious. I am somewhat surprised that you didn't rewrite the await this.pendingTask part using a SemaphoreSlim or your own AsyncLock or similar. Do you generally believe that improving thread-safety in "synchronous" portions of async methods is a premature optimisation?Vein
@KirillShlenskiy: There's nothing wrong with using SemaphoreSlim or the like for a one-at-a-time restriction.Ebro
None of the CancellationTokenSources are getting disposed. Is this an issue? I believe var newCts = ... could be put inside a using which wraps the whole rest of the method. But then previousCts.Cancel() would regularly throw ObjectDisposedException so it should probably be moved down a bit, inside the try block. The question is: is catching ObjectDisposedException unavoidable in this case and is disposing a CTS really worth it? It's probably just about avoiding the performance penalty of finalization in this specific snippet (if I'm not mistaken), but exceptions are also not speedy.Knickerbockers
@relatively_random: I don't normally dispose my CancellationTokenSource instances, unless I know nothing is listening and they won't be canceled. I find the disposal exceptions too problematic.Ebro
For linked sources like this, it seems like it's required. I just tried running a loop to test it: while(true) cts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token); It leaks.Knickerbockers
(EDIT: Forgot about exceptions.) (EDIT 2: Forgot about finally. Turned out this is actually hard.) I think I finally managed to solve it without raising exceptions: pastebin.com/2tPLEZp3 The idea is to exchange the cts field before disposing it. That way, the would-be disposers can check if someone else already took up the mantle. Declaring the API thread-safe would require exchanging cts and pendingTask atomically inside a lock, but everything else is the same.Knickerbockers
@relatively_random: When a CTS is cancelled, that's almost the same as a dispose. If you (eventually) cancel a CTS, then I don't think disposal is necessary.Ebro
You're right. I tried that same loop with canceling and it doesn't leak anymore. That's good to know, thanks.Knickerbockers
@StephenCleary If i call SpellcheckAsync, what is the value for the CancellationToken parameter ?Yoshieyoshiko
@Saftpresse99: If you have a CancellationToken to pass in, then pass it in; otherwise, you can create one that is cancelled at your desired time (e.g., when the user navigates to another view), or just pass default to not cancel it.Ebro
E
6

Here's the most recent version of the cancel-and-restart pattern that I use:

class AsyncWorker
{
    Task _pendingTask;
    CancellationTokenSource _pendingTaskCts;

    // the actual worker task
    async Task DoWorkAsync(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        Debug.WriteLine("Start.");
        await Task.Delay(100, token);
        Debug.WriteLine("Done.");
    }

    // start/restart
    public void Start(CancellationToken token)
    {
        var previousTask = _pendingTask;
        var previousTaskCts = _pendingTaskCts;

        var thisTaskCts = CancellationTokenSource.CreateLinkedTokenSource(token);

        _pendingTask = null;
        _pendingTaskCts = thisTaskCts;

        // cancel the previous task
        if (previousTask != null && !previousTask.IsCompleted)
            previousTaskCts.Cancel();

        Func<Task> runAsync = async () =>
        {
            // await the previous task (cancellation requested)
            if (previousTask != null)
                await previousTask.WaitObservingCancellationAsync();

            // if there's a newer task started with Start, this one should be cancelled
            thisTaskCts.Token.ThrowIfCancellationRequested();

            await DoWorkAsync(thisTaskCts.Token).WaitObservingCancellationAsync();
        };

        _pendingTask = Task.Factory.StartNew(
            runAsync,
            CancellationToken.None,
            TaskCreationOptions.None,
            TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
    }

    // stop
    public void Stop()
    {
        if (_pendingTask == null)
            return;

        if (_pendingTask.IsCanceled)
            return;

        if (_pendingTask.IsFaulted)
            _pendingTask.Wait(); // instantly throw an exception

        if (!_pendingTask.IsCompleted)
        {
            // still running, request cancellation 
            if (!_pendingTaskCts.IsCancellationRequested)
                _pendingTaskCts.Cancel();

            // wait for completion
            if (System.Threading.Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA)
            {
                // MTA, blocking wait
                _pendingTask.WaitObservingCancellation();
            }
            else
            {
                // TODO: STA, async to sync wait bridge with DoEvents,
                // similarly to Thread.Join
            }
        }
    }
}

// useful extensions
public static class Extras
{
    // check if exception is OperationCanceledException
    public static bool IsOperationCanceledException(this Exception ex)
    {
        if (ex is OperationCanceledException)
            return true;

        var aggEx = ex as AggregateException;
        return aggEx != null && aggEx.InnerException is OperationCanceledException;
    }

    // wait asynchrnously for the task to complete and observe exceptions
    public static async Task WaitObservingCancellationAsync(this Task task)
    {
        try
        {
            await task;
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }

    // wait for the task to complete and observe exceptions
    public static void WaitObservingCancellation(this Task task)
    {
        try
        {
            task.Wait();
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }
}

Test use (producing only a single "Start/Done" output for DoWorkAsync):

private void MainForm_Load(object sender, EventArgs e)
{
    var worker = new AsyncWorker();
    for (var i = 0; i < 10; i++)
        worker.Start(CancellationToken.None);
}
Electrum answered 2/12, 2013 at 4:31 Comment(1)
A more recent and functional version of this pattern is here: https://mcmap.net/q/24369/-task-sequencing-and-re-entracyElectrum
M
0

Hope this will be useful - tried to create Helper class which can be re-used:

class SelfCancelRestartTask
{
    private Task _task = null;
    public CancellationTokenSource TokenSource { get; set; } = null;

    public SelfCancelRestartTask()
    {
    }

    public async Task Run(Action operation)
    {
        if (this._task != null &&
            !this._task.IsCanceled &&
            !this._task.IsCompleted &&
            !this._task.IsFaulted)
        {
            TokenSource?.Cancel();
            await this._task;
            TokenSource = new CancellationTokenSource();
        }
        else
        {
            TokenSource = new CancellationTokenSource();
        }
        this._task = Task.Run(operation, TokenSource.Token);
    }
Manilla answered 14/9, 2017 at 20:45 Comment(0)
B
0

The examples above seem to have problems when the asynchronous method is called multiple times quickly after each other, for example four times. Then all subsequent calls of this method cancel the first task and in the end three new tasks are generated which run at the same time. So I came up with this:

    private List<Tuple<Task, CancellationTokenSource>> _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>();

    /// <remarks>This method is asynchronous, i.e. it runs partly in the background. As this method might be called multiple times 
    /// quickly after each other, a mechanism has been implemented that <b>all</b> tasks from previous method calls are first canceled before the task is started anew.</remarks>
    public async void ParameterExtraction() {

        CancellationTokenSource newCancellationTokenSource = new CancellationTokenSource();

        // Define the task which shall run in the background.
        Task newTask = new Task(() => {
            // do some work here
                }
            }
        }, newCancellationTokenSource.Token);

        _parameterExtractionTasks.Add(new Tuple<Task, CancellationTokenSource>(newTask, newCancellationTokenSource));

        /* Convert the list to arrays as an exception is thrown if the number of entries in a list changes while 
         * we are in a for loop. This can happen if this method is called again while we are waiting for a task. */
        Task[] taskArray = _parameterExtractionTasks.ConvertAll(item => item.Item1).ToArray();
        CancellationTokenSource[] tokenSourceArray = _parameterExtractionTasks.ConvertAll(item => item.Item2).ToArray();

        for (int i = 0; i < taskArray.Length - 1; i++) { // -1: the last task, i.e. the most recent task, shall be run and not canceled. 
            // Cancel all running tasks which were started by previous calls of this method
            if (taskArray[i].Status == TaskStatus.Running) {
                tokenSourceArray[i].Cancel();
                await taskArray[i]; // wait till the canceling completed
            }
        }

        // Get the most recent task
        Task currentThreadToRun = taskArray[taskArray.Length - 1];

        // Start this task if, but only if it has not been started before (i.e. if it is still in Created state). 
        if (currentThreadToRun.Status == TaskStatus.Created) {
            currentThreadToRun.Start();
            await currentThreadToRun; // wait till this task is completed.
        }

        // Now the task has been completed once. Thus we can recent the list of tasks to cancel or maybe run.
        _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>();
    }
Balalaika answered 24/10, 2017 at 7:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.