How to force an IAsyncEnumerable to respect a CancellationToken
Asked Answered
P

3

22

I have an async iterator method that produces an IAsyncEnumerable<int> (a stream of numbers), one number every 200 msec. The caller of this method consumes the stream, but wants to stop the enumeration after 1000 msec. So a CancellationTokenSource is used, and the token is passed as an argument to the WithCancellation extension method. But the token is not respected. The enumeration continues until all the numbers are consumed:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

Output:

12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10

The expected output is a TaskCanceledException to occur after number 5. It seems that I have misunderstood what the WithCancellation is actually doing. The method just passes the supplied token to the iterator method, if that method accepts one. Otherwise, like with the method GetSequence() in my example, the token is ignored. I suppose that the solution in my case is to interrogate manually the token inside the body of the enumeration:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

This is simple and works well. But in any case I wonder if it would be possible to create an extension method that does what I expected the WithCancellation to do, to bake the token inside the ensuing enumeration. This is the signature of the needed method:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}
Phosphorite answered 4/10, 2019 at 10:39 Comment(5)
If the code is not written in a manner that allows it to abort early, you cannot force it to abort early. Well, you can, but you really shouldn't.Imaginary
@LasseVågsætherKarlsen this is like saying that you shouldn't break from a loop early. This is a very strong claim to make!Phosphorite
The situations are not analogous -- breaking from a synchronous loop is always safe, but "cancelling" asynchronous enumeration only between iterations means that we may be adding considerable overhead and delays (not an issue for Task.Delay, but definitely an issue for real work). The situation is not as dire as for general async work (where we may have to accept work has not been cancelled at all and is still going on in the background, albeit ignored), since async enumeration implicitly includes disposing resources, but still not optimal. Compare running this with Task.Delay(10000).Adore
@JeroenMostert breaking from synchronous loops is safe because the compiler-generated iterators are disposing properly all disposable resources, and the same is true for compiler-generated async iterators too. When you break inside an await foreach means that you are breaking after the completion of the previous MoveNextAsync, when nothing special is going on.Phosphorite
@JeroenMostert regarding the case of ignored background work, I have made a relevant question here. The feedback I got is that I should transfer the responsibility to the caller to provide an extra cancellation notification, in addition to breaking the loop.Phosphorite
A
43

IAsyncEnumerable explicitly provides for this mechanism with the EnumeratorCancellation attribute:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}

In fact, the compiler is helpful enough to issue a warning if you give the method a CancellationToken parameter, but do not add the attribute.

Note that the token passed to .WithCancellation will override any local token passed to the method. The specs have the details on this.

Of course, this will still only work if the enumeration actually accepts a CancellationToken -- but the fact that cancellation only really works if done cooperatively is true of any async work. Yeldar's answer is good for "forcing" some measure of cancellation into an enumerable that doesn't support it, but the preferred solution should be to modify the enumeration to support cancellation by itself -- the compiler does everything to help you out.

Adore answered 4/10, 2019 at 13:12 Comment(11)
Thanks Jeroen for the answer! The information you provide is important, although it was already known to me. My question is about the case of non-cooperative cancellation, basically breaking from a loop that consumes the async enumerable, but it's important that you pointed to the distinction between the two modes.Phosphorite
Btw according to my tests the information provided in the proposal specs in inaccurate. If two tokens are passed, one directly and one through WithCancellation, both will be respected. Maybe they changed their mind about this in the meanwhile.Phosphorite
As async streams are new in C# 8, I find it hard to come up with a scenario where you'd have an enumerable that you couldn't write to support cooperative cancellation -- there's no legacy code yet that we have to fix! You could conceivably have an async enumeration that's wrapping an existing sync enumeration (as bad of an idea as that is) but even then you could (and arguably should) insert the cancellation logic there.Adore
From my understanding the cancellation is useful to allow cancelling the awaiting between two loops. So in you example you should probably uncomment the Task.Delay(200, ct) code, because this is where the awaiting happens. In general I agree that every IEnumerable that deserves to become IAsyncEnumerable, it deserves to support cancellation too. :-)Phosphorite
Yes, if Task.Delay represented real (cancellable) work, we would simply pass the token and we wouldn't need to call ThrowIfCancellationRequested at all (assuming the rest of the loop did nothing interesting either).Adore
Yeap. But we may be unlucky and the real work may not be cancelable. We may have to call a legacy API for example. Then it would be debatable if there is any value at accepting a cancellation token that doesn't actually cancel the real job, but only cancels the awaiting while the real job continues running.Phosphorite
It is debatable whether to even have an AsyncEnumerable at all in that case, but if you must have one like that, I would use neither WithCancellation nor the proposed WithForcedCancellation, and I would use your original approach of pulling the check into the outer code. That's the only way that makes it clear that we're pretending to have asynchronous work, when we really don't (which is really good to know). Note that asynchronous code can consume plain old synchronous Enumerables just fine, with cancellation in the iterations -- you could make an extension method for that.Adore
When I talked about legacy API I was thinking about async methods that don't support cancellation. This is the new legacy now. The sync methods are the legacy of the legacy. :-)Phosphorite
Fair enough -- but note that the approach in this answer works perfectly fine for that (i.e. it's using a Task.Delay that can't be cancelled to represent that work). This is a lot cleaner than having an extension method on every IAsyncEnumerable, as that already presumes we have IAsyncEnumerables that are written "wrong"... which is unnecessarily pessimistic, and also depressing. :-PAdore
Ha ha! First world problems. Feeling depressed after handled an enumerable that lacks cancellation. :-P Seriously though, the extension method WithCancellation is pretty easy to be misunderstood initially. I expect that lots of developers are going to misuse it, having made incorrect assumptions about what this method is actually intended to do. Noticing immediately bellow in the IntelliSense the WithEnforcedCancellation method could prevent them, maybe, from making these incorrect assumptions.Phosphorite
Beware: This statement, taken from the draft specs, turns out to be incorrect: "Note that the token passed to .WithCancellation will override any local token passed to the method." In actuality, the cancellation tokens are combined, with cancellation of either being honored, as the above fiddle demonstrates. I have been unable to find documentation of this fact.Mcknight
R
4

You can just extract your logic into an extension method like this:

public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    if (source == null)
        throw new ArgumentNullException(nameof(source));

    cancellationToken.ThrowIfCancellationRequested();

    await foreach (var item in source)
    {
        cancellationToken.ThrowIfCancellationRequested();
        yield return item;
    }
}
Revenue answered 4/10, 2019 at 10:50 Comment(3)
It was that simple! I wonder why it didn't pass through my mind. I was too obsessed with the idea of creating an async enumerable wrapper, and missed the simple solution. :-)Phosphorite
@TheodorZoulias Yes, indeed, fixed :)Revenue
You still want WithCancellation in your await foreach, otherwise the task won't be cleaned up immediately upon cancellation, which could leak memory. You need to ensure the await on the sequence itself is cancelled, which WithCancellation does for you.Tammy
C
2

I think it's important to reiterate that you're not supposed to do this. It's always better to make the async method support cancellation tokens, then cancellation is immediate as you would expect. If that's impossible, I still recommend trying one of the other answers before trying this one.

With that said, if you can't add cancellation support to the async method, and you absolutely do need immediate termination of the foreach, then you can hack your way around it.

One trick is to use Task.WhenAny with two arguments:

  1. the task you get from IAsyncEnumerator.MoveNextAsync()
  2. another task that does support cancellation

Here's the short version

// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);

// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
    yield return enumerator.Current;
}

Long version with ConfigureAwait(false) and DisposeAsync() for completeness, should work if you run it locally.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public static class AsyncStreamHelper
{
    public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        if (source == null)
            throw new ArgumentNullException(nameof(source));
        cancellationToken.ThrowIfCancellationRequested();

        // Start the 'await foreach' without the new syntax
        // because we need access to the ValueTask returned by MoveNextAsync()
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        Task<bool> moveNext = null;

        // Combine MoveNextAsync() with another Task that can be awaited indefinitely,
        // until it throws OperationCanceledException
        var untilCanceled = UntilCanceled(cancellationToken);
        try
        {
            while (
                await (
                    await Task.WhenAny(
                        (
                            moveNext = enumerator.MoveNextAsync().AsTask()
                        ),
                        untilCanceled
                    ).ConfigureAwait(false)
                )
            )
            {
                yield return enumerator.Current;
            }
        }
        finally
        {
            if (moveNext != null && !moveNext.IsCompleted)
            {
                // Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!

                moveNext.ContinueWith(async _ =>
                {
                    await enumerator.DisposeAsync();
                }, TaskScheduler.Default);
#pragma warning restore 4014
            }
            else if (enumerator != null)
            {
                await enumerator.DisposeAsync();
            }
        }
    }

    private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
    {
        // This is just one possible implementation... feel free to swap out for something else
        return new Task<bool>(() => true, cancellationToken);
    }
}

public class Program
{
    public static async Task Main()
    {
        var cts = new CancellationTokenSource(500);
        var stopwatch = Stopwatch.StartNew();
        try
        {
            await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
            {
                Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
        }
    }

    static async IAsyncEnumerable<int> GetSequence()
    {
        for (int i = 1; i <= 10; i++)
        {
            await Task.Delay(200);
            yield return i;
        }
    }
}

Caveats

The enumerator returns a ValueTask for improved performance (uses fewer allocations than regular Tasks), but a ValueTask cannot be used with Task.WhenAny(), so AsTask() is used which degrades performance by introducing allocation overhead.

The enumerator can only be disposed if the most recent MoveNextAsync() is completed. It's more likely that the Task is still running when cancellation is requested. That's why I added another call to DisposeAsync in a continuation task.

In this scenario, the enumerator is not yet disposed when the WithEnforcedCancellation() method exits. It will be disposed some indeterminate amount of time after the enumeration is abandoned. If DisposeAsync() throws an exception , the exception will be lost. It cannot bubble up the call stack, because there is no call stack.

Cassity answered 2/9, 2021 at 0:49 Comment(10)
Thanks Steven for the answer. AFAICS your WithEnforcedCancellation implementation works perfectly. Two minor improvements that I could suggest is to dispose the enumerator before exiting the method, and configure the ContinueWith with the TaskScheduler.Default, to avoid the problems associated with relying on the ambient TaskScheduler.Current.Phosphorite
Also an alternative way to create the Task<bool> cancellationRequested is this: new Task<bool>(() => true, cancellationToken). This will never return true, because the Task<bool> is not started. It may only complete as Canceled. This makes the cancellationToken.ThrowIfCancellationRequested() check inside the loop redundant.Phosphorite
Also configuring the Task.WhenAny with ConfigureAwait(false) might be desirable (although this is debatable).Phosphorite
I updated my answer with your improvements, except for disposing the enumerator which throws NotSupportedException for some reason. I don't really understand why.Cassity
Ah, yes. Probably it cannot be disposed at this moment because there is a pending MoveNextAsync operation in-flight. That's the consequence for doing things that we shouldn't really be doing. 😃Phosphorite
^yes, this, see: github.com/dotnet/runtime/issues/51176#issuecomment-818866190Cassity
I added the call to DisposeAsync in a continuation task for the most recent pending MoveNextAsync task, I also added a new note to the caveats.Cassity
Now the implementation seems to be perfect in all aspects. Thanks Steven!Phosphorite
I found one more improvement, the enumerator can be disposed synchronously when the most recent task is already completed. I also added a note about exception handling (or lack thereof).Cassity
That's a nice final touch! Btw not only a possible exception thrown by the DisposeAsync will be lost, but also by the last incomplete moveNext task. No one will ever know what happened to them!Phosphorite

© 2022 - 2024 — McMap. All rights reserved.