Usage of Task.WhenAll with infinite Tasks produced by BlockingCollection
Asked Answered
I

3

7

I am adding Background Tasks to a Blocking Collection (added in the Background).

I am waiting with Task.WhenAll on a Enumerable returned by GetConsumingEnumerable.

My question is: Is the overload of Task.WhenAll which receives an IEnumerable "prepared" to potentially receive an endless amount of tasks?

I am simply not sure if i can do it this way or if it was meant to be used this way?

private async Task RunAsync(TimeSpan delay, CancellationToken cancellationToken)
{
    using (BlockingCollection<Task> jobcollection = new BlockingCollection<Task>())
    {
        Task addingTask = Task.Run(async () =>
        {
            while (true)
            {
                DateTime utcNow = DateTime.UtcNow;
                var jobs = Repository.GetAllJobs();
                foreach (var job in GetRootJobsDue(jobs, utcNow))
                {
                    jobcollection.Add(Task.Run(() => RunJob(job, jobs, cancellationToken, utcNow), cancellationToken), cancellationToken);
                }

                await Task.Delay(delay, cancellationToken);
            }
        }, cancellationToken);

        await Task.WhenAll(jobcollection.GetConsumingEnumerable(cancellationToken));
    }
}
Imputation answered 25/8, 2014 at 14:48 Comment(10)
Seems like using Parallel.Foreach(Repository.GetAllJobs,....) would be a simpler solution.Blanche
Task.WhenAll will block the calling thread till GetConsumingEnumerable finishesCreel
Well if there are an infinite number of tasks they'll never all finish, because you'll never get the whole set.Blatt
@SriramSakthivel GetConsumingEnumerable returns immediatelly an IEnumerable but only gets iterated when data is available... that intended behaviourImputation
@Imputation I edited my comment before you reply. See if that makes sense?Creel
@Blatt Thats what i want... Only return when CancellationToken is fired... (deep in some other code)Imputation
@Imputation But WhenAll doesn't yield control until it has been able to enumerate all of the items in the sequence it's given.Blatt
@Imputation Then don't use WhenAll at all, just wait on that cancellation token.Blatt
@Blanche Nope because you still need to add the task to the Enumerable... but yes as a replacement for WhenAllImputation
It's not even going to execute that line until the token has been cancelled... if you want to return when it gets cancelled, just remove that line altogether.Aerate
V
5

Task.WhenAll will not work with an infinite number of tasks. It will first (synchronously) wait for the enumerable to finish, and then (asynchronously) wait for them all to complete.

If you want to react to a sequence in an asynchronous way, then you need to use IObservable<Task> (Reactive Extensions). You can use a TPL Dataflow BufferBlock as a "queue" that can work with either synchronous or asynchronous code, and is easily convertible to IObservable<Task>.

Vieira answered 25/8, 2014 at 14:58 Comment(3)
Does the same apply for Parallel.Foreach. Will it also wait until the enumeration is finished?Imputation
@Imputation Parallel.Foreach is an inherently synchronous operation, not an asynchronous one. Not only will it synchronously wait for the sequence to finish, it'll synchronously wait to finish processing all of the items.Blatt
I'm pretty sure that Parallel will not immediately wait for the enumeration before starting processing, but you will "lose" a thread to the enumerating.Vieira
B
7

Since your goal is merely to wait until the cancellation token is cancelled, you should do that. For reasons others have explained, using WhenAll on an infinite sequence of tasks isn't the way to go about that. There are easier ways to get a task that will never complete.

await new TaskCompletionSource<bool>().Task
    .ContinueWith(t => { }, cancellationToken);
Blatt answered 25/8, 2014 at 15:3 Comment(6)
It seems his goal is to wait until all the tasks have been completed, not until they've all been created.Aerate
How does that work? TaskCompletionSource<bool>()Task is never going to complete which means ContinueWith never going to execute. I assume you mean to call SetCancelled on the Task?Creel
@KyleW No, it's not.Blatt
@SriramSakthivel Nope, this will work as is. When the token is cancelled it'll stop waiting for the never ending task it is a continuation of to finish. Your approach would work to, it'd just be more lines of code.Blatt
I see, That's beautiful. I overlooked it.. +1Creel
This would render also the BlockingCollection useless... I could simply run the Tasks in the loop in the background and just await addingTaskImputation
V
5

Task.WhenAll will not work with an infinite number of tasks. It will first (synchronously) wait for the enumerable to finish, and then (asynchronously) wait for them all to complete.

If you want to react to a sequence in an asynchronous way, then you need to use IObservable<Task> (Reactive Extensions). You can use a TPL Dataflow BufferBlock as a "queue" that can work with either synchronous or asynchronous code, and is easily convertible to IObservable<Task>.

Vieira answered 25/8, 2014 at 14:58 Comment(3)
Does the same apply for Parallel.Foreach. Will it also wait until the enumeration is finished?Imputation
@Imputation Parallel.Foreach is an inherently synchronous operation, not an asynchronous one. Not only will it synchronously wait for the sequence to finish, it'll synchronously wait to finish processing all of the items.Blatt
I'm pretty sure that Parallel will not immediately wait for the enumeration before starting processing, but you will "lose" a thread to the enumerating.Vieira
A
0

I assume that Task.WhenAll will try to enumerate the collection, which means that it itself will block until the collection is completed or cancelled. If it didn't, then the code could theoretically finish awaiting before the tasks have been created. So there's going to be an extra block in there... it will block waiting for the threads to be created, and then block again until the tasks are done. I don't think that's a bad thing for your code, as it's still going to block until the same point in time.

Aerate answered 25/8, 2014 at 14:58 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.