I have a method that accepts an IAsyncEnumerable
as argument, and returns also an IAsyncEnumerable
. It calls a web method for each item in the input stream, and propagates the result to the output stream. My question is how can I be notified if the caller of my method has stopped enumerating the output stream, so I can stop enumerating the input stream inside my method? It seems that I should be able to be notified because the caller disposes by default the IAsyncEnumerator
that gets from my method. Is there any build-in mechanism that generates such a notification for compiler-generated async methods? If not, what is the easiest to implement alternative?
Example. The web method validates if an url is valid or not. There is a never ending stream of urls provided, but the caller stops enumerating the results when more than 2 invalid urls are found:
var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
Console.WriteLine($"Url {result.Url} is "
+ (result.IsValid ? "OK" : "Invalid!"));
if (!result.IsValid) invalidCount++;
if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);
The generator of the urls. One url is generated every 300 msec.
private static async IAsyncEnumerable<string> GetMockUrls()
{
int index = 0;
while (true)
{
await Task.Delay(300);
yield return $"https://mock.com/{++index:0000}";
}
}
The validator of the urls. There is a requirement that the input stream is enumerated eagerly, so two asynchronous workflows are running in parallel. The first workflow inserts the urls in a queue, and the second workflow picks the urls one by one and validates them. A BufferBlock
is used as async queue.
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
_ = Task.Run(async () =>
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
await buffer.SendAsync(url);
}
buffer.Complete();
});
while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
{
yield return (url, await MockValidateUrl(url));
}
}
Clarification: the queue is mandatory, and removing it is not an option. It is an essential component of this problem.
The validator of a single url. The validation process lasts 300 msec on average.
private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
await Task.Delay(_random.Next(100, 600));
return _random.Next(0, 2) != 0;
}
Output:
Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...
The problem is that urls are still generated and received after the caller/client has finished the asynchronous enumeration. I would like to fix this, so that no more messages appear in the console after --Async enumeration finished--
.
CancellationToken
shared between both enumerables to exit, instead of using a manualbreak
statement? – HydroIAsyncEnumerator
. – Teplicabreak
out of the loop. – Hydroawait foreach
. It's like driving a car with two brake pedals, and having the responsibility to press the one after the other (if you forget to press the second pedal the brake fluid will boil). – Teplicayield
proceeds only if the output is actually consumed. – MezzorelievoBufferBlock
, that's trying to emulate a Channel. TheTask.Run
isn't really needed as the only non async operation isConsole.WriteLine($"Url {url} received");
. First, do you need a buffer there? Without it, a simpleawait foreach(var url in urls){ yield return MockValidateUrl(url);}
would work just fine. With any kind of buffer you end up having to wait for the buffer to fill – MezzorelievoTask.Run
is kind of redundant here. Maybe I over-simplified the example. My actual problem is more complex though, and I can't get away without the buffer. I am still searching for a way to pass a notification from DisposeAsync, so that I can cancel the buffer-feeding task without resorting to adding burden to the caller. – TeplicaDisposeAsync
or any task that completes when the work completes – MezzorelievoIAsyncEnumerator.DisposeAsync
after eachawait foreach
. The enumerator is disposed automatically by the infrastructure, and I am trying to take advantage of that. The reason I need the buffer is because instead of the urls I actually buffer the tasks that validate the urls, and propagate the results as they become available. The urls are not propagated in the same order they came in. Surely there are other ways to do this, but I am trying to familiarize myself with the new asynchronous enumeration capabilities of C# 8. – TeplicaWithCancellation
method and theEnumeratorCancellation
attribute. – TeplicaIAsyncEnumerable
that is driven my multiple concurrent tasks. This is a solved problem now. – TeplicaLastStep
method can be converted to a generic method easily – MezzorelievoCancellationToken
. It is a very ugly solution though. It involves duplicating theValidateUrls
iterator method, one private with token and one public without token, then enumerating the first from inside the second, after wrapping it into anAsyncEnumerableWrapper
that adds a hook atDisposeAsync
that cancels the token. If this is the only solution then there is no solution, because I can't live with that. – Teplica