How to stop propagating an asynchronous stream (IAsyncEnumerable)
Asked Answered
T

2

2

I have a method that accepts an IAsyncEnumerable as argument, and returns also an IAsyncEnumerable. It calls a web method for each item in the input stream, and propagates the result to the output stream. My question is how can I be notified if the caller of my method has stopped enumerating the output stream, so I can stop enumerating the input stream inside my method? It seems that I should be able to be notified because the caller disposes by default the IAsyncEnumerator that gets from my method. Is there any build-in mechanism that generates such a notification for compiler-generated async methods? If not, what is the easiest to implement alternative?

Example. The web method validates if an url is valid or not. There is a never ending stream of urls provided, but the caller stops enumerating the results when more than 2 invalid urls are found:

var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
    Console.WriteLine($"Url {result.Url} is "
        + (result.IsValid ? "OK" : "Invalid!"));
    if (!result.IsValid) invalidCount++;
    if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);

The generator of the urls. One url is generated every 300 msec.

private static async IAsyncEnumerable<string> GetMockUrls()
{
    int index = 0;
    while (true)
    {
        await Task.Delay(300);
        yield return $"https://mock.com/{++index:0000}";
    }
}

The validator of the urls. There is a requirement that the input stream is enumerated eagerly, so two asynchronous workflows are running in parallel. The first workflow inserts the urls in a queue, and the second workflow picks the urls one by one and validates them. A BufferBlock is used as async queue.

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
    {
        yield return (url, await MockValidateUrl(url));
    }
}

Clarification: the queue is mandatory, and removing it is not an option. It is an essential component of this problem.

The validator of a single url. The validation process lasts 300 msec on average.

private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
    await Task.Delay(_random.Next(100, 600));
    return _random.Next(0, 2) != 0;
}

Output:

Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...

The problem is that urls are still generated and received after the caller/client has finished the asynchronous enumeration. I would like to fix this, so that no more messages appear in the console after --Async enumeration finished--.

Teplica answered 3/10, 2019 at 9:40 Comment(19)
Could you use a CancellationToken shared between both enumerables to exit, instead of using a manual break statement?Hydro
@BradleyUffner Yeap, this is definitely doable. But I don't really want to transfer this responsibility to the caller. The only responsibility the caller should have is to dispose the IAsyncEnumerator.Teplica
It seems that the caller already has responsibility by having to manually break out of the loop.Hydro
@BradleyUffner breaking the loop is business logic. Cancelling a cancellation token on top of that is a burden, that leaks the abstraction of the await foreach. It's like driving a car with two brake pedals, and having the responsibility to press the one after the other (if you forget to press the second pedal the brake fluid will boil).Teplica
Looks more like you have a truck following another truck. The truck in the front is throwing stuff to the truck following it and you want to stop the front truck by applying the breaks on the back truck.Clomb
Your generator won't produce anything unless the client asks for it. This is still an iterator and each yield proceeds only if the output is actually consumed.Mezzorelievo
As for the BufferBlock, that's trying to emulate a Channel. The Task.Run isn't really needed as the only non async operation is Console.WriteLine($"Url {url} received");. First, do you need a buffer there? Without it, a simple await foreach(var url in urls){ yield return MockValidateUrl(url);} would work just fine. With any kind of buffer you end up having to wait for the buffer to fillMezzorelievo
@PauloMorgado I wish it was as simple as joining the trucks together. I am fighting all day with this problem, and still can't find a solution.Teplica
@TheodorZoulias the problem is caused by the buffer. If it had eg a boundary, the worker task would stop once it was full. Is there a reason for the buffer block ? Without it you could use a simple iterator method. A cancellation token is required in any case - even if the worker stops, it won't complete.Mezzorelievo
@PanagiotisKanavos you are right that the Task.Run is kind of redundant here. Maybe I over-simplified the example. My actual problem is more complex though, and I can't get away without the buffer. I am still searching for a way to pass a notification from DisposeAsync, so that I can cancel the buffer-feeding task without resorting to adding burden to the caller.Teplica
@TheodorZoulias why do you need the buffer, especially unbounded? The size of the buffer determines how long the worker will keep pumping items. There's no DisposeAsync in your code. In any case what is the actual problem? I suspect you could pass ChannelReaders around instead of IAsyncEnumerables. Or you could signal the "global" CancellationToken for the entire pipeline as a continuation of DisposeAsync or any task that completes when the work completesMezzorelievo
@TheodorZoulias in any case, it's the caller that decided to end the iteration prematurely. It's the caller's job to tell the upstream tasks they need to end and release their resources. Another option would be to add a timeout that cancels the worker even if the caller "forgets" to cancelMezzorelievo
@PanagiotisKanavos There is always an implicit IAsyncEnumerator.DisposeAsync after each await foreach. The enumerator is disposed automatically by the infrastructure, and I am trying to take advantage of that. The reason I need the buffer is because instead of the urls I actually buffer the tasks that validate the urls, and propagate the results as they become available. The urls are not propagated in the same order they came in. Surely there are other ways to do this, but I am trying to familiarize myself with the new asynchronous enumeration capabilities of C# 8.Teplica
@TheodorZoulias that changes the problem a lot. You should explain all this in the questionMezzorelievo
@PanagiotisKanavos I tried to keep it simple. If there is a solution for the simplified example, the same solution should work for the complex problem too. My objective is to pass the notification from the async disposal into the body of the asynchronous iterator. Maybe the solution involves the WithCancellation method and the EnumeratorCancellation attribute.Teplica
@TheodorZoulias they are different problems. In any case, to make multiple concurrent calls you'd need to start multiple concurrent tasks. I'm not sure you can consume IAsyncEnumerable from multiple tasks. That's a fan-out/fan-in problem (I'm stealing the term from Go). Cancellation is a different problem. There are ways to combine those but no magic.Mezzorelievo
@PanagiotisKanavos we have concluded from a previous question that it is doable to produce a single IAsyncEnumerable that is driven my multiple concurrent tasks. This is a solved problem now.Teplica
You want to consume it from multiple tasks. That's a different problem. In any case, you can't signal anything to previous IAsyncEnumerable instances, much less the worker task behind the buffer one or two calls away. You have to use a CancellationToken. If you want to ensure people use it correctly you can create an extension method that takes the IAsyncEnumerable and the CTS, applies an Action<T> to each item and signals the CTS at the end. The LastStep method can be converted to a generic method easilyMezzorelievo
@PanagiotisKanavos I have already managed to produce a working solution, that indeed makes use of a CancellationToken. It is a very ugly solution though. It involves duplicating the ValidateUrls iterator method, one private with token and one public without token, then enumerating the first from inside the second, after wrapping it into an AsyncEnumerableWrapper that adds a hook at DisposeAsync that cancels the token. If this is the only solution then there is no solution, because I can't live with that.Teplica
T
2

I suppose I should answer my own question, since I now have a simple enough generic solution.

Update: I am scraping my previous answer because I discovered a much easier solution. It is embarassingly simple actually. All I have to do is to enclose the yielding part of the ValidateUrls iterator into a try-finally block. The finally block will be executed on every case, either by the caller completing normally the enumeration, or abnormally by a break or an exception. So this is how I can get the notification I am looking for, by cancelling a CancellationTokenSource on finally:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    var completionCTS = new CancellationTokenSource();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            if (completionCTS.IsCancellationRequested) break;
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    try
    {
        while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
        {
            yield return (url, await MockValidateUrl(url));
        }
    }
    finally // This runs when the caller completes the enumeration
    {
        completionCTS.Cancel();
    }
}

I should probably note that an async iterator that doesn't support cancellation is not a good practice. Without it the caller has no easy way to stop the awaiting between the consumption of one value and the next. So a better signature for my method should be:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{

The token could then be passed to the awaited methods of the yielding loop, the OutputAvailableAsync and the MockValidateUrl.

From the caller's perspective, the token can be passed either directly, or by chaining the extension method WithCancellation.

await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))
Teplica answered 4/10, 2019 at 14:5 Comment(0)
M
2

Edit

The discussion will be easier with an appropriate example. Validating URLs isn't so expensive. What if you need to hit eg 100 URLs and pick the first 3 responses?

In that case both the worker and the buffer make sense.

Edit 2

One of the comments adds extra complexity - the tasks are executed concurrently and the results need to be emitted as they arrive.


For starters, ValidateUrl could be rewritten as an iterator method:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    await foreach (var url in urls)
    {
        Console.WriteLine($"Url {url} received");
        var isValid=await MockValidateUrl(url);
        yield return (url, isValid);
    }
}

There's no need for a worker Task as all methods are asynchronous. The iterator method won't proceed unless a consumer asks for a result. Even if MockValidateUrl does something expensive, it could use a Task.Run itself or get wrapped in a Task.Run. That would generate quite a few tasks though.

For completeness' sake you can add a CancellationToken and ConfigureAwait(false) :

public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
       IAsyncEnumerable<string> urls, 
       [EnumeratorCancellation]CancellationToken token=default)
{
    await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
    {
        var isValid=await MockValidateUrl(url).ConfigureAwait(false);
        yield return (url,isValid);
    }
}

In any case, as soon as the caller stops iterating, ValidateUrls will stop.

Buffering

Buffering is a problem - no matter how it's programmed, the worker won't stop until the buffer fills. The buffer's size is how many iterations the worker will go on before it realizes it needs to stop. This is a great case for a Channel (yes, again!) :

public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
        IAsyncEnumerable<string> urls,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.WithCancellation(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader.ReadAllAsync(token);
}

It's better to pass around ChannelReaders instead of IAsyncEnumerables though. At the very least, no async enumerator is constructed until someone tries to read from the ChannelReader. It's also easier to construct pipelines as extension methods :

public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
        this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.ReadAllAsync(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader;
}

This syntax allows constructing pipelines in a fluent manner. Let's say we have this helper method to convert IEnumerables to channesl (or IAsyncEnumerables) :

public static ChannelReader<T> AsChannel(
         IEnumerable<T> items)
{
    var channel=Channel.CreateUnbounded();        
    var writer=channel.Writer;
    foreach(var item in items)
    {
        channel.TryWrite(item);
    }
    return channel.Reader;
}

We can write :

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls();

await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
   //Use the items here
}

Concurrent calls with immediate propagation

That's easy with channels, although the worker in this time needs to fire all of the tasks at once. Essentially, we need multiple workers. That's not something that can be done with just IAsyncEnumerable.

First of all, if we wanted to use eg 5 concurrent tasks to process the inputs we could write

    var tasks = Enumerable.Range(0,5).
                  .Select(_ => Task.Run(async ()=>{
                                 /// 
                             },token));
    _ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));        

instead of :

    _ = Task.Run(async ()=>{
        /// 
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        

Using a large number of workers could be enough. I'm not sure if IAsyncEnumerable can be consumed by multiple workers, and I don't really want to find out.

Premature Cancellation

All of the above work if the client consumes all results. To stop processing after eg the first 5 results though, we need the CancellationToken :

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls(cts.Token);

int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
    //Break after 3 iterations
    if(i++>2)
    {
        break;
    }
    ....
}

cts.Cancel();

This code itself could be extracted in a method that receives a ChannelReader and, in this case, the CancellationTokenSource :

static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
    {
    int i=0;
    await foreach(var (url,isValid) in pipeline.ReadAllAsync())
    {
        //Break after 3 iterations
        if(i++>2)
        {
            break;
        }
        ....
    }

    cts.Cancel();        
}

And the pipeline becomes :

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     
                    .ValidateUrls(cts.Token)
                    .LastStep(cts);
Mezzorelievo answered 3/10, 2019 at 14:19 Comment(3)
Thanks Panagiotis for the answer. It is very interesting as always. All these stuff about chaining channels are fascinating! My question though has to do specifically with await foreach and the IAsyncEnumerable. It's not about a problem searching for any solution. It is about learning the capabilities (and the limits) of the new language feature of asynchronous enumeration in C# 8.Teplica
About the line await foreach(var url in urls.WithCancellation(token)), it's not doing what I initially thought it would do. As is now, cancelling the token will have no effect. For the enumeration to be cancelled it's required that the method GetMockUrls that produced the urls accepts a CancellationToken argument decorated with the EnumeratorCancellation, and uses internally this token to break the loop. In other words the existing cancellation pattern is cooperative. You can't cancel the enumeration of an async enumerable that was not designed to be cancelled at the first place!Teplica
I took the time to read carefully this answer, and played a bit with the code examples. I do agree that the ChannelReader is superior to the IAsyncEnumerable at its ability to be enumerated concurrently by multiple consumers. The IAsyncEnumerator is not thread-safe. On all other aspects it seems to me that the IAsyncEnumerable is superior. 1) It's much more fluent and elegant to produce and consume because of the language support. 2) Ensures the disposal of resources by invoking automatically the finally blocks. No cooperative cancellation patterns like LastStep are required.Teplica
T
2

I suppose I should answer my own question, since I now have a simple enough generic solution.

Update: I am scraping my previous answer because I discovered a much easier solution. It is embarassingly simple actually. All I have to do is to enclose the yielding part of the ValidateUrls iterator into a try-finally block. The finally block will be executed on every case, either by the caller completing normally the enumeration, or abnormally by a break or an exception. So this is how I can get the notification I am looking for, by cancelling a CancellationTokenSource on finally:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    var completionCTS = new CancellationTokenSource();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            if (completionCTS.IsCancellationRequested) break;
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    try
    {
        while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
        {
            yield return (url, await MockValidateUrl(url));
        }
    }
    finally // This runs when the caller completes the enumeration
    {
        completionCTS.Cancel();
    }
}

I should probably note that an async iterator that doesn't support cancellation is not a good practice. Without it the caller has no easy way to stop the awaiting between the consumption of one value and the next. So a better signature for my method should be:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{

The token could then be passed to the awaited methods of the yielding loop, the OutputAvailableAsync and the MockValidateUrl.

From the caller's perspective, the token can be passed either directly, or by chaining the extension method WithCancellation.

await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))
Teplica answered 4/10, 2019 at 14:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.