I think it's important to reiterate that you're not supposed to do this. It's always better to make the async method support cancellation tokens, then cancellation is immediate as you would expect. If that's impossible, I still recommend trying one of the other answers before trying this one.
With that said, if you can't add cancellation support to the async method, and you absolutely do need immediate termination of the foreach, then you can hack your way around it.
One trick is to use Task.WhenAny
with two arguments:
- the task you get from
IAsyncEnumerator.MoveNextAsync()
- another task that does support cancellation
Here's the short version
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
while (await await Task.WhenAny(enumerator.MoveNextAsync().AsTask(), untilCanceled))
{
yield return enumerator.Current;
}
Long version with ConfigureAwait(false)
and DisposeAsync()
for completeness, should work if you run it locally.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class AsyncStreamHelper
{
public static async IAsyncEnumerable<T> WithEnforcedCancellation<T>(this IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken cancellationToken)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
cancellationToken.ThrowIfCancellationRequested();
// Start the 'await foreach' without the new syntax
// because we need access to the ValueTask returned by MoveNextAsync()
var enumerator = source.GetAsyncEnumerator(cancellationToken);
Task<bool> moveNext = null;
// Combine MoveNextAsync() with another Task that can be awaited indefinitely,
// until it throws OperationCanceledException
var untilCanceled = UntilCanceled(cancellationToken);
try
{
while (
await (
await Task.WhenAny(
(
moveNext = enumerator.MoveNextAsync().AsTask()
),
untilCanceled
).ConfigureAwait(false)
)
)
{
yield return enumerator.Current;
}
}
finally
{
if (moveNext != null && !moveNext.IsCompleted)
{
// Disable warning CS4014 "Because this call is not awaited, execution of the current method continues before the call is completed"
#pragma warning disable 4014 // This is the behavior we want!
moveNext.ContinueWith(async _ =>
{
await enumerator.DisposeAsync();
}, TaskScheduler.Default);
#pragma warning restore 4014
}
else if (enumerator != null)
{
await enumerator.DisposeAsync();
}
}
}
private static Task<bool> UntilCanceled(CancellationToken cancellationToken)
{
// This is just one possible implementation... feel free to swap out for something else
return new Task<bool>(() => true, cancellationToken);
}
}
public class Program
{
public static async Task Main()
{
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
}
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
}
Caveats
The enumerator returns a ValueTask for improved performance (uses fewer allocations than regular Tasks), but a ValueTask cannot be used with Task.WhenAny()
, so AsTask()
is used which degrades performance by introducing allocation overhead.
The enumerator can only be disposed if the most recent MoveNextAsync()
is completed. It's more likely that the Task is still running when cancellation is requested. That's why I added another call to DisposeAsync
in a continuation task.
In this scenario, the enumerator is not yet disposed when the WithEnforcedCancellation()
method exits. It will be disposed some indeterminate amount of time after the enumeration is abandoned. If DisposeAsync()
throws an exception , the exception will be lost. It cannot bubble up the call stack, because there is no call stack.
Task.Delay
, but definitely an issue for real work). The situation is not as dire as for general async work (where we may have to accept work has not been cancelled at all and is still going on in the background, albeit ignored), since async enumeration implicitly includes disposing resources, but still not optimal. Compare running this withTask.Delay(10000)
. – Adoreawait foreach
means that you are breaking after the completion of the previous MoveNextAsync, when nothing special is going on. – Phosphorite