How to limit the number of items that are passing concurrently through an entire Dataflow pipeline?
Asked Answered
B

2

8

I want to limit the number of items posted in a Dataflow pipeline. The number of items depends of the production environment. These objects consume a large amount of memory (images) so I would like to post them when the last block of the pipeline has done its job.

I tried to use a SemaphoreSlim to throttle the producer and release it in the last block of the pipeline. It works, but if an exception is raised during the process, the program waits forever and the exception is not intercepted.

Here is a sample which looks like our code. How can I do this ?

static void Main(string[] args)
{
    SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);

    var downloadString = new TransformBlock<string, string>(uri =>
    {
        Console.WriteLine("Downloading '{0}'...", uri);
        return new WebClient().DownloadString(uri);
    });

    var createWordList = new TransformBlock<string, string[]>(text =>
    {
        Console.WriteLine("Creating word list...");

        char[] tokens = text.ToArray();
        for (int i = 0; i < tokens.Length; i++)
        {
            if (!char.IsLetter(tokens[i]))
                tokens[i] = ' ';
        }
        text = new string(tokens);

        return text.Split(new char[] { ' ' },
           StringSplitOptions.RemoveEmptyEntries);
    });

    var filterWordList = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Filtering word list...");
        throw new InvalidOperationException("ouch !"); // explicit for test
        return words.Where(word => word.Length > 3).OrderBy(word => word)
           .Distinct().ToArray();
    });

    var findPalindromes = new TransformBlock<string[], string[]>(words =>
    {
        Console.WriteLine("Finding palindromes...");

        var palindromes = new ConcurrentQueue<string>();

        Parallel.ForEach(words, word =>
        {
            string reverse = new string(word.Reverse().ToArray());

            if (Array.BinarySearch<string>(words, reverse) >= 0 &&
                word != reverse)
            {
                palindromes.Enqueue(word);
            }
        });

        return palindromes.ToArray();
    });

    var printPalindrome = new ActionBlock<string[]>(palindromes =>
    {
        try
        {
            foreach (string palindrome in palindromes)
            {
                Console.WriteLine("Found palindrome {0}/{1}",
                   palindrome, new string(palindrome.Reverse().ToArray()));
            }
        }
        finally
        {
            semaphore.Release();
        }
    });

    downloadString.LinkTo(createWordList);
    createWordList.LinkTo(filterWordList);
    filterWordList.LinkTo(findPalindromes);
    findPalindromes.LinkTo(printPalindrome);


    downloadString.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)createWordList).Fault(t.Exception);
        else createWordList.Complete();
    });
    createWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)filterWordList).Fault(t.Exception);
        else filterWordList.Complete();
    });
    filterWordList.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) 
            ((IDataflowBlock)findPalindromes).Fault(t.Exception);
                // enter here when an exception throws
        else findPalindromes.Complete();
    });
    findPalindromes.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted)
            ((IDataflowBlock)printPalindrome).Fault(t.Exception);
                // the fault is propagated here but not caught
        else printPalindrome.Complete();
    });

    try
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine(i);
            
            downloadString.Post("http://www.google.com");
            semaphore.Wait(); // waits here when an exception throws
        }

        downloadString.Complete();

        printPalindrome.Completion.Wait();
    }
    catch (AggregateException agg)
    {
        Console.WriteLine("An error has occured : " + agg);
    }
    Console.WriteLine("Done");
    Console.ReadKey();
}
Batholomew answered 3/3, 2015 at 17:17 Comment(0)
G
3

You should simply wait on both the semaphore and the completion task together. In that way if the block ends prematurely (either by exception or cancellation) then the exception will be rethrown and if not then you will wait on your semaphore until there's room to post more.

You can do that with Task.WhenAny and SemaphoreSlim.WaitAsync:

for (int i = 0; i < 10; i++)
{
    Console.WriteLine(i);
    downloadString.Post("http://www.google.com");

    if (printPalindrome.Completion.IsCompleted)
    {
        break;
    }

    Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
}

Note: using Task.Wait is only appropriate in this case as it's Main. Usually this should be an async method and you should await the task returned from Task.WhenAny.

Gluttonize answered 3/3, 2015 at 18:12 Comment(8)
Thanks, this part works great. However, the loop continue to produce items to the two firsts blocks which are not flagged as faulted. If I modifiy this parts of code : findPalindromes.Completion.ContinueWith(t => { if (t.IsFaulted) { ((IDataflowBlock)printPalindrome).Fault(t.Exception); ((IDataflowBlock)downloadString).Fault(t.Exception); //mark the first block has faulted } else printPalindrome.Complete(); }); it works. But I am not sure its the better way to do that.Batholomew
Won't going this route just make the code run synchronously since it's only telling the main thread to wait?Impudent
@Batholomew You can just check whether the completion task completed. Look at my update.Gluttonize
@Impudent yes. The main thread will run and wait synchronously.Gluttonize
@Gluttonize Ok, this solution stop the producer. Is there a way to stop the producer and prevent the first blocks to consume their buffer ? To reproduce this issue, initialize the semaphore like this SemaphoreSlim semaphore = new SemaphoreSlim(4, 5);Batholomew
@Batholomew You should probably pass a CancellationToken to all your blocks through the options and cancel it when you want to stop.Gluttonize
@Gluttonize Indeed, it works with a CancellationToken. Thank you for your advice and your patienceBatholomew
@Batholomew sure.. anytime.Gluttonize
C
3

This is how I handled throttling or only allowing 10 items in the source block at any one time. You could modify this to have 1. Make sure that you also throttle any other blocks in the pipeline, otherwise, you could get the source block with 1 and the next block with a lot more.

var sourceBlock = new BufferBlock<string>(
    new ExecutionDataflowBlockOptions() { 
        SingleProducerConstrained = true, 
        BoundedCapacity = 10 });

Then the producer does this:

sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);

If you're using async / await, just await the SendAsync call.

Commingle answered 8/4, 2015 at 14:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.