This is short code sample to quickly introduce you what is my question about:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var secondBlock = new TransformBlock<int,string>(async x =>
{
if (x == 12)
{
await Task.Delay(5000);
return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
}
return $"{DateTime.Now}: Message is {x}";
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
firstBlock.LinkTo(secondBlock);
secondBlock.LinkTo(thirdBlock);
var populateTask = Task.Run(async () =>
{
foreach (var x in Enumerable.Range(1, 15))
{
await firstBlock.SendAsync(x);
}
});
populateTask.Wait();
secondBlock.Completion.Wait();
}
}
}
The output is:
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
Why is this order and how can I change the network to get the output below?
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
So I am wondering why should all other blocks (or tasks here) wait for the delayed block?
UPDATE
Since you guys asked me to explain my problem more detailed I made this sample that is more closer to the real pipeline I am working on. Let's say the application downloads some data and computes hash based on returned response.
using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
{
using (var httpClient = new HttpClient())
{
if (x == "4") await Task.Delay(5000);
var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
return new Tuple<string, string>(x, result);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
{
using (var algorithm = SHA256.Create())
{
var bytes = Encoding.UTF8.GetBytes(x.Item2);
var hash = algorithm.ComputeHash(bytes);
return new Tuple<string, byte[]>(x.Item1, hash);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
{
var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";
Console.WriteLine(output);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
firstBlock.LinkTo(secondBlock);
secondBlock.LinkTo(thirdBlock);
thirdBlock.LinkTo(fourthBlock);
var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();
fourthBlock.Completion.Wait();
}
private static string GetHashAsString(byte[] bytes)
{
var sb = new StringBuilder();
int i;
for (i = 0; i < bytes.Length; i++)
{
sb.AppendFormat("{0:X2}", bytes[i]);
if (i % 4 == 3) sb.Append(" ");
}
return sb.ToString();
}
}
}
Let's take a look at the order of the requests:
This definitely makes sense. All the requests are made as soon as possible. The slow fourth request is in end of list.
Now let us see what output we have:
09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F
09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
You can see that all the hashes after third were computed right after fourth response came.
So based on these two facts we can say that all downloaded pages were waiting for slow fourth request to be done. It would be better to not wait for fourth request and compute hashes as soon as data is downloaded. Is there any way I can achieve this?
HttpClient
and gets data from web server and in some cases it stops the entire network waiting for some page to finish. – Victimthere were more messages that you couldnt process before the delayed message appeared
. I don't want the whole pipeline wait for this only delayed message so the output would be the same as the last output I illustrated in question. Why it can't process messages when delayed message comes in work? Can I change this behavior somehow? – VictimMaxDegreeOfParallelism = 4
anActionBlock
that just writes messages to theConsole
makes no sense at all. The console is inherently synchronized, and so all parallel workflows that are trying to write to the console are contending for the same exclusive lock. I suggest that you set the parallelism to1
for this block (the default value). And by doing so you may get lucky and watch your problem disappear. – Starla