Let's say I have two sequences returning integers 1 to 5.
The first returns 1, 2 and 3 very fast, but 4 and 5 take 200ms each.
public static IEnumerable<int> FastFirst()
{
for (int i = 1; i < 6; i++)
{
if (i > 3) Thread.Sleep(200);
yield return i;
}
}
The second returns 1, 2 and 3 with a 200ms delay, but 4 and 5 are returned fast.
public static IEnumerable<int> SlowFirst()
{
for (int i = 1; i < 6; i++)
{
if (i < 4) Thread.Sleep(200);
yield return i;
}
}
Unioning both these sequences give me just numbers 1 to 5.
FastFirst().Union(SlowFirst());
I cannot guarantee which of the two methods has delays at what point, so the order of the execution cannot guarantee a solution for me. Therefore, I would like to parallelise the union, in order to minimise the (artifical) delay in my example.
A real-world scenario: I have a cache that returns some entities, and a datasource that returns all entities. I'd like to be able to return an iterator from a method that internally parallelises the request to both the cache and the datasource so that the cached results yield as fast as possible.
Note 1: I realise this is still wasting CPU cycles; I'm not asking how can I prevent the sequences from iterating over their slow elements, just how I can union them as fast as possible.
Update 1: I've tailored achitaka-san's great response to accept multiple producers, and to use ContinueWhenAll to set the BlockingCollection's CompleteAdding just the once. I just put it here since it would get lost in the lack of comments formatting. Any further feedback would be great!
public static IEnumerable<TResult> SelectAsync<TResult>(
params IEnumerable<TResult>[] producer)
{
var resultsQueue = new BlockingCollection<TResult>();
var taskList = new HashSet<Task>();
foreach (var result in producer)
{
taskList.Add(
Task.Factory.StartNew(
() =>
{
foreach (var product in result)
{
resultsQueue.Add(product);
}
}));
}
Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());
return resultsQueue.GetConsumingEnumerable();
}
Union()
? – Ethanethane