Using Parallel Linq Extensions to union two sequences, how can one yield the fastest results first?
Asked Answered
M

2

9

Let's say I have two sequences returning integers 1 to 5.

The first returns 1, 2 and 3 very fast, but 4 and 5 take 200ms each.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

The second returns 1, 2 and 3 with a 200ms delay, but 4 and 5 are returned fast.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

Unioning both these sequences give me just numbers 1 to 5.

FastFirst().Union(SlowFirst());

I cannot guarantee which of the two methods has delays at what point, so the order of the execution cannot guarantee a solution for me. Therefore, I would like to parallelise the union, in order to minimise the (artifical) delay in my example.

A real-world scenario: I have a cache that returns some entities, and a datasource that returns all entities. I'd like to be able to return an iterator from a method that internally parallelises the request to both the cache and the datasource so that the cached results yield as fast as possible.

Note 1: I realise this is still wasting CPU cycles; I'm not asking how can I prevent the sequences from iterating over their slow elements, just how I can union them as fast as possible.

Update 1: I've tailored achitaka-san's great response to accept multiple producers, and to use ContinueWhenAll to set the BlockingCollection's CompleteAdding just the once. I just put it here since it would get lost in the lack of comments formatting. Any further feedback would be great!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}
Metallo answered 9/11, 2011 at 13:25 Comment(6)
how can you make it any faster if you still have to look at all elements in the slow enumerable, i.e. in the case of Union() ?Ethanethane
Because in the artificial example, 1, 2 & 3 should be immediately available to return. I'm not saying the total execution time will be any faster, but the time-to-first-result.Metallo
As I understand one entry might be contained in both caches. You want to ask both of them and return one which was quicker?Seasonseasonable
@achitaka-san yep - I guess I'm asking if it's possible to yield from a parallel enumeration as soon as elements are available (and ignore any that took longer, since it's a union)Metallo
I would implement requests using Asynchronous Pattern. Thus you can cancel unneeded second request. Need to think on how to yield results in an elegant way back into enumerator.Seasonseasonable
Isn't the "result" variable collapsing to the last element in the list before any of the tasks get started? If that is the case, then you are actually only taking the last producer's data multiple times... That's an annoying behaviour of .NET that the iteration variable of a foreach is reused between iterations instead of being reallocated : https://mcmap.net/q/162728/-access-to-modified-closure-2Shortstop
S
3

Take a look at this. The first method just returns everything in order results come. The second checks uniqueness. If you chain them you will get the result you want I think.

public static class Class1
{
    public static IEnumerable<TResult> SelectAsync<TResult>(
        IEnumerable<TResult> producer1,
        IEnumerable<TResult> producer2,
        int capacity)
    {
        var resultsQueue = new BlockingCollection<TResult>(capacity);
        var producer1Done = false;
        var producer2Done = false;

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer1)
            {
                resultsQueue.Add(product);
            }
            producer1Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer2)
            {
                resultsQueue.Add(product);
            }
            producer2Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        return resultsQueue.GetConsumingEnumerable();
    }


    public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
    {
        HashSet<TResult> knownResults = new HashSet<TResult>();
        foreach (TResult result in source)
        {
            if (knownResults.Contains(result)) {continue;}
            knownResults.Add(result);
            yield return result;
        }
    }
}
Seasonseasonable answered 9/11, 2011 at 14:4 Comment(6)
This is arguably worse since you are not yielding the results but have to wait until your full result set is readyEthanethane
Calling CompleteAdding twice is definitely an error in design. You should return resultsQueue.GetConsumingEnumerable() and only call CompleteAdding() when the last of the two tasks has finished.Purpurin
Are you sure? I am using BlockingCollection. I am starting two thread and returning it immediately. Consumer thread is able to read from it. If it's empty consumer will wait until an entry comes or end was claimed using CompleteAdding. This is a Consumer Producer pattern. See msdn.microsoft.com/en-us/library/dd997371.aspxSeasonseasonable
Where is knownResults being filled? I would replace if (!knownResults.Contains(result)) with if (knownResults.Add(result)) (Add() only returns true if the value did not already exist).Purpurin
Thanks for your answer, this looks pretty good. Do you know if there's a similar method using the Parallel Extensions already? I have a number of providers only known at runtime, so would be great to benefit from its existing propensity to manage threads rather than the Task.Factory call - unless I'm missing something there about how it behaves (is it safe to call in a loop rather than the example's hardcoded 2?) The number of providers is never likely to reach double-digits.Metallo
I've accepted the answer and posted a modified version in an edit - thanks!Metallo
A
0

The cache would be nearly instant compared to fetching from the database, so you could read from the cache first and return those items, then read from the database and return the items except those that were found in the cache.

If you try to parallelise this, you will add a lot of complexity but get quite a small gain.

Edit:

If there is no predictable difference in the speed of the sources, you could run them in threads and use a synchronised hash set to keep track of which items you have already got, put the new items in a queue, and let the main thread read from the queue:

public static IEnumerable<TItem> GetParallel<TItem, TKey>(Func<TItem, TKey> getKey, params IEnumerable<TItem>[] sources) {
  HashSet<TKey> found = new HashSet<TKey>();
  List<TItem> queue = new List<TItem>();
  object sync = new object();
  int alive = 0;
  object aliveSync = new object();
  foreach (IEnumerable<TItem> source in sources) {
    lock (aliveSync) {
      alive++;
    }
    new Thread(s => {
      foreach (TItem item in s as IEnumerable<TItem>) {
        TKey key = getKey(item);
        lock (sync) {
          if (found.Add(key)) {
            queue.Add(item);
          }
        }
      }
      lock (aliveSync) {
        alive--;
      }
    }).Start(source);
  }
  while (true) {
    lock (sync) {
      if (queue.Count > 0) {
        foreach (TItem item in queue) {
          yield return item;
        }
        queue.Clear();
      }
    }
    lock (aliveSync) {
      if (alive == 0) break;
    }
    Thread.Sleep(100);
  }
}

Test stream:

public static IEnumerable<int> SlowRandomFeed(Random rnd) {
  int[] values = new int[100];
  for (int i = 0; i < 100; i++) {
    int pos = rnd.Next(i + 1);
    values[i] = i;
    int temp = values[pos];
    values[pos] = values[i];
    values[i] = temp;
  }
  foreach (int value in values) {
    yield return value;
    Thread.Sleep(rnd.Next(200));
  }
}

Test:

Random rnd = new Random();
foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) {
  Console.Write("{0:0000 }", item);
}
Airman answered 9/11, 2011 at 13:41 Comment(3)
As I mentioned in the question though, I can't guarantee which sequence has "delays" at what point. Imagine two caches, I don't know which one is faster because they were written by a third-party and are loaded in via a plugin system.Metallo
With this solution you have to wait at least as long as a full loop through the slowest collection.Catheryncatheter
@Airman Take a look at BlockingCollection msdn.microsoft.com/en-us/library/dd997371.aspx. What you do in the last part of the method is just a poor implenetation of Producer-Consumer.Seasonseasonable

© 2022 - 2024 — McMap. All rights reserved.