Why do blocks run in this order?
Asked Answered
V

3

5

This is short code sample to quickly introduce you what is my question about:

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var secondBlock = new TransformBlock<int,string>(async x =>
            {
                if (x == 12)
                {
                    await Task.Delay(5000);
                    return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
                }

                return $"{DateTime.Now}: Message is {x}";
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);

            var populateTask = Task.Run(async () =>
            {
                foreach (var x in Enumerable.Range(1, 15))
                {
                    await firstBlock.SendAsync(x);
                }
            });

            populateTask.Wait();
            secondBlock.Completion.Wait();
        }
    }
}

The output is:

09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14

Why is this order and how can I change the network to get the output below?

09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
09.08.2016 15:03:13: Message is 12 (This is delayed message!)

So I am wondering why should all other blocks (or tasks here) wait for the delayed block?


UPDATE

Since you guys asked me to explain my problem more detailed I made this sample that is more closer to the real pipeline I am working on. Let's say the application downloads some data and computes hash based on returned response.

using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

Let's take a look at the order of the requests:

requests

This definitely makes sense. All the requests are made as soon as possible. The slow fourth request is in end of list.

Now let us see what output we have:

09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F
09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3

You can see that all the hashes after third were computed right after fourth response came.

So based on these two facts we can say that all downloaded pages were waiting for slow fourth request to be done. It would be better to not wait for fourth request and compute hashes as soon as data is downloaded. Is there any way I can achieve this?

Victim answered 9/8, 2016 at 10:7 Comment(7)
Please specifically describe your problem.Overboard
In your list your timestamps dont tally - it would suggest that this is fake data.. but, lets pretend the timestamps all make sense, and that somehow 3 messages came in after your 5 second delay - that suggests there were more messages that you couldnt process before the delayed message appeared.. so it looks like everything is workingKratz
@progy_rock My issue is that all the processings stop before the delayed item is not processed. In my other network there is block that uses HttpClient and gets data from web server and in some cases it stops the entire network waiting for some page to finish.Victim
@Kratz Don't pay attention to these timestamps but the order of messages to appear. You mentioned it as it exactly works there were more messages that you couldnt process before the delayed message appeared. I don't want the whole pipeline wait for this only delayed message so the output would be the same as the last output I illustrated in question. Why it can't process messages when delayed message comes in work? Can I change this behavior somehow?Victim
As I see it though your delayed message is holding up the queue for 1 of your threads, so you're down to 3.. While I havent tested it, rather than the foreach with await in it.. would you not do better with foreach(i in enumerable.Range(1,15) numlist.Add(i), Task.WhenAll( numlist.Select( i=> Firstblock.SendAsync(i));Kratz
@Kratz That didn't help. Please see my question update.Victim
Configuring with MaxDegreeOfParallelism = 4 an ActionBlock that just writes messages to the Console makes no sense at all. The console is inherently synchronized, and so all parallel workflows that are trying to write to the console are contending for the same exclusive lock. I suggest that you set the parallelism to 1 for this block (the default value). And by doing so you may get lucky and watch your problem disappear.Starla
V
5

Okay, by reference from @SirRufo I started thinking about implementing my own TransformBlock that would fit my needs and process the incoming items without a respect to ordering. This way it won't ruin the network, establishing a gap between blocks in part of downloading and would be the elegant way to go.

So I started looking at what and how can I do that. To look into sources of TransformBlock itself seemed to be a good start point so I opened TransformBlock sources on Github and started analyzing it.

Right from in the start of the class I found this interesting thing: // If parallelism is employed, we will need to support reordering messages that complete out-of-order.

// However, a developer can override this with EnsureOrdered == false.
if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
{
    _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
}

Looks like the exact thing that we want! Let us see this EnsureOrdered option in DataflowBlockOptions class on Github:

/// <summary>Gets or sets whether ordered processing should be enforced on a block's handling of messages.</summary>
/// <remarks>
/// By default, dataflow blocks enforce ordering on the processing of messages. This means that a
/// block like <see cref="TransformBlock{TInput, TOutput}"/> will ensure that messages are output in the same
/// order they were input, even if parallelism is employed by the block and the processing of a message N finishes 
/// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input
/// ordering prior to making those results available to a consumer).  Some blocks may allow this to be relaxed,
/// however.  Setting <see cref="EnsureOrdered"/> to false tells a block that it may relax this ordering if
/// it's able to do so.  This can be beneficial if the immediacy of a processed result being made available
/// is more important than the input-to-output ordering being maintained.
/// </remarks>
public bool EnsureOrdered
{
    get { return _ensureOrdered; }
    set { _ensureOrdered = value; }
}

It was looking really good so I instantly switched to the IDE to set it. Unfortunately, there was not any setting like this:

No EnsureOrdered

I kept searching and found this note:

4.5.25-beta-23019

Package has been renamed to System.Threading.Tasks.Dataflow

When I Googled and found this package, called System.Threading.Tasks.Dataflow! So I uninstalled Microsoft.Tpl.Dataflow package and installed System.Threading.Tasks.Dataflow by issuing:

Install-Package System.Threading.Tasks.Dataflow

And there was the EnsureOrdered option. I updated the code with setting EnsureOrdered to false:

using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false };
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), options);

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, options);

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, options);

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, options);

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

And the result output was exactly what I want:

10.08.2016 11:03:23: Hash for element #3: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #1: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #2: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #10: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #8: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #9: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #5: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #7: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #6: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:27: Hash for element #4: FD25E52B FCD8DE81 BD38E11B 13C20B96 09473283 F25346B2 04593B70 E4357BDA
Victim answered 10/8, 2016 at 6:4 Comment(0)
M
3

This is by design and documented

Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, ...

Proof:

var ts = Environment.TickCount;

var firstBlock = new TransformBlock<int, int>(
    x => x,
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
    } );

var secondBlock = new TransformBlock<int, string>(
    x =>
    {
        var start = Environment.TickCount;

        if ( x == 3 )
        {
            Thread.Sleep( 5000 );
            return $"Start {start-ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }

        return $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
        // limit the internal queue to 10 items
        BoundedCapacity = 10,
    } );

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 1,
    } );

firstBlock.LinkTo( secondBlock, new DataflowLinkOptions { PropagateCompletion = true, } );
secondBlock.LinkTo( thirdBlock, new DataflowLinkOptions { PropagateCompletion = true, } );

foreach ( var x in Enumerable.Range( 1, 15 ) )
{
    // to ensure order of items
    firstBlock.Post( x );
}

firstBlock.Complete();
thirdBlock.Completion.Wait();

Output:

Start 31 Finished 31: Message is 1
Start 31 Finished 31: Message is 2
Start 31 Finished 5031: Message is 3 (This is delayed message!)
Start 31 Finished 31: Message is 4
Start 31 Finished 31: Message is 5
Start 31 Finished 31: Message is 6
Start 31 Finished 31: Message is 7
Start 31 Finished 31: Message is 8
Start 31 Finished 31: Message is 9
Start 31 Finished 31: Message is 10
Start 31 Finished 31: Message is 11
Start 31 Finished 31: Message is 12
Start 5031 Finished 5031: Message is 13
Start 5031 Finished 5031: Message is 14
Start 5031 Finished 5031: Message is 15

Solution 1

Do not use DataFlow for the downloading part because the order guarantee will block the processing you are looking for.

var ts = Environment.TickCount;

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 4,
    } );

Parallel.ForEach(
    Enumerable.Range( 1, 15 ),
    new ParallelOptions { MaxDegreeOfParallelism = 4, },
    x =>
    {
        var start = Environment.TickCount;
        string result;

        if ( x == 12 )
        {
            Thread.Sleep( 5000 );
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }
        else
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
        thirdBlock.Post( result );
    } );

thirdBlock.Complete();
thirdBlock.Completion.Wait();

Output:

Start 32 Finished 32: Message is 2
Start 32 Finished 32: Message is 6
Start 32 Finished 32: Message is 5
Start 32 Finished 32: Message is 8
Start 32 Finished 32: Message is 9
Start 32 Finished 32: Message is 10
Start 32 Finished 32: Message is 11
Start 32 Finished 32: Message is 7
Start 32 Finished 32: Message is 13
Start 32 Finished 32: Message is 14
Start 32 Finished 32: Message is 15
Start 32 Finished 32: Message is 3
Start 32 Finished 32: Message is 4
Start 32 Finished 32: Message is 1
Start 32 Finished 5032: Message is 12 (This is delayed message!)

Solution 2

Of course you can implement IPropagatorBlock<TInput,TOutput> in a custom class that did not guarantee the order of the items.

Mou answered 9/8, 2016 at 12:18 Comment(11)
Is there any way to make them not wait delayed block and go ahead?Victim
There is no way to change that basic feature (preserve order)Mou
Did you read the docs? There is a guarantee for the order and you want to work around that guarantee. int type guarantees to contain an int value. If you want to store a string value use a different type than int. There is no workaroundMou
But maybe you want to use Parallel.For for download and posting the data into the DataFlow.Mou
Please see my question update. I hope I made it a little bit more clear.Victim
The main thing to notice there is that all the hash computing blocks were just wasting their time waiting for slow fourth request to be loaded (although they already have other pages loaded and they could compute hashes for them).Victim
The problem is and was already well described - we do not need any more and deeper information about the problem itself and what is caused by that. The solution is: Do not use DataFlow for the download partMou
Have a look at the two solutionsMou
What are the solutions you are talking about?Victim
The text below the bold lines named Solution 1 and Solution 2 in my answer?Mou
Sorry, I didn't notice you updated your answer. Anyway I found the better solution. Please see my own answer to this question.Victim
G
1

Looking at the timestamps, the output of the second block is working as you expect it to - the delayed TransformBlock is being run after all other TransformBlocks. It seems to be the Console.WriteLine in the ActionBlock which is not being called in the order you expect.

Is your code secondBlock.Completion.Wait(); incorrect - should it be thirdBlock.Completion.Wait(); in order to get the results you are expecting ?

Galling answered 9/8, 2016 at 10:44 Comment(6)
Please don't pay attention to these timestamps. The desired output is illustrated in the end of question. How can I achieve this? I tried to wait on completion of third block and it doesn't work this way.Victim
The timestamps tell you important things - that the problem lies in the order that the ActionBlocks in the thirdBlock get executed - the TransformBlocks are being executed in your expected order, and this is evidenced by the timestamps. Although Console.WriteLine is thread safe, that doesn't mean it is predictable in the order in which it handles incoming requests. The problem is in the ActionBlock and ActionBlock is dependent on Console.WriteLine. I'd eliminate Console and write to a threadsafe collection + use stopwatch to record what order the ActionBlocks are actually called.Galling
I have another network with httpclients get the pages instead of console writings and it works the same way like here: further requests aren't going until the delayed request is finished. Right after this delayed request is done (with error) everything works good: requedts are running concurrently.Victim
Please see my quesiton update. I posted a little bit explained code.Victim
I can't give an answer as to why the framework works that way. You also ask "Is there any way I can achieve this ?". Surely a simpler set of 10 Task each of which is whole and complete - i.e. gets the string, computes the hash and outputs the result would be far easier to understand and would avoid any framework oddities. Each of those individual steps could use Await/Asynch programming techniques but would avoid the chaining problem you are seeing.Galling
The thing is the real Dataflow mesh I have is about 50 blocks and in one particular part of the mesh i had this issue. I just made the app for this question for the sack of simplicity. Believe me the Dataflow way is a quite suitable for the real app I am working on. There is no way to rewrite it to pure async/awaits. Moreover, the issue is resolved and I posted my answer with the solution to the question.Victim

© 2022 - 2024 — McMap. All rights reserved.