The answers to your questions are: no, you don't need another block type, yes, you can use TransformManyBlock twice, and yes, it does make sense. I wrote some code to prove it, which is at the bottom, and some notes on how it works, which are after that.
The code uses a split then merge pipeline as you describe. As for the bit you were struggling with: merging the data for individual files back together can be done by adding processed items to a list as they become available. Then we only pass the list on to the next block if it has the expected final number of items. This can be done with a fairly simple TransformMany block returning zero or one items. This block can't be parallelized because the list isn't threadsafe.
Once you've got a pipeline like this you can test the parallelization and ordering by just using the options passed to the blocks. The code below sets parallelization to unbounded for every block it can, and lets the DataFlow code sort it out. On my machine it maxes out all the cores/logical processors and is CPU-bound, which is what we want. Ordering is enabled, but turning that off doesn't make much difference: again, we are CPU-bound.
Finally I have to say this is a very cool tech, but you can actually solve this problem much more simply using PLINQ, where it's just a few lines of code to get something just as fast. The big drawback is that you can't easily incrementally add fast-arriving messages to a pipeline if you do that: PLINQ is better-suited to one big batch process. However PLINQ may be a better solution for your usecase.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ParallelDataFlow
{
class Program
{
static void Main(string[] args)
{
new Program().Run();
Console.ReadLine();
}
private void Run()
{
Stopwatch s = new Stopwatch();
s.Start();
// Can experiment with parallelization of blocks by changing MaxDegreeOfParallelism
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
var getInputPathsBlock = new TransformManyBlock<(int, int), WorkItem>(date => GetWorkItemWithInputPath(date), options);
var loadDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => LoadDataIntoWorkItem(workItem), options);
var processDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => ProcessDataForWorkItem(workItem), options);
var waitForProcessedDataBlock = new TransformManyBlock<WorkItem, List<WorkItem>>(workItem => WaitForWorkItems(workItem)); // Can't parallelize this block
var mergeDataBlock = new TransformBlock<List<WorkItem>, List<WorkItem>>(list => MergeWorkItemData(list), options);
var saveDataBlock = new ActionBlock<List<WorkItem>>(list => SaveWorkItemData(list), options);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
getInputPathsBlock.LinkTo(loadDataBlock, linkOptions);
loadDataBlock.LinkTo(processDataBlock, linkOptions);
processDataBlock.LinkTo(waitForProcessedDataBlock, linkOptions);
waitForProcessedDataBlock.LinkTo(mergeDataBlock, linkOptions);
mergeDataBlock.LinkTo(saveDataBlock, linkOptions);
// We post individual tuples of (year, month) to our pipeline, as many as we want
getInputPathsBlock.Post((1903, 2)); // Post one month and date
var dates = from y in Enumerable.Range(2015, 5) from m in Enumerable.Range(1, 12) select (y, m);
foreach (var date in dates) getInputPathsBlock.Post(date); // Post a big sequence
getInputPathsBlock.Complete();
saveDataBlock.Completion.Wait();
s.Stop();
Console.WriteLine($"Completed in {s.ElapsedMilliseconds}ms on {ThreadAndTime()}");
}
private IEnumerable<WorkItem> GetWorkItemWithInputPath((int year, int month) date)
{
List<WorkItem> processedWorkItems = new List<WorkItem>(); // Will store merged results
return GetInputPaths(date.year, date.month).Select(
path => new WorkItem
{
Year = date.year,
Month = date.month,
FilePath = path,
ProcessedWorkItems = processedWorkItems
});
}
// Get filepaths of form e.g. Files/20191101.txt These aren't real files, they just show how it could work.
private IEnumerable<string> GetInputPaths(int year, int month) =>
Enumerable.Range(0, GetNumberOfFiles(year, month)).Select(i => $@"Files/{year}{Pad(month)}{Pad(i + 1)}.txt");
private int GetNumberOfFiles(int year, int month) => DateTime.DaysInMonth(year, month);
private WorkItem LoadDataIntoWorkItem(WorkItem workItem) {
workItem.RawData = LoadData(workItem.FilePath);
return workItem;
}
// Simulate loading by just concatenating to path: in real code this could open a real file and return the contents
private string LoadData(string path) => "This is content from file " + path;
private WorkItem ProcessDataForWorkItem(WorkItem workItem)
{
workItem.ProcessedData = ProcessData(workItem.RawData);
return workItem;
}
private string ProcessData(string contents)
{
Thread.SpinWait(11000000); // Use 11,000,000 for ~50ms on Windows .NET Framework. 1,100,000 on Windows .NET Core.
return $"Results of processing file with contents '{contents}' on {ThreadAndTime()}";
}
// Adds a processed WorkItem to its ProcessedWorkItems list. Then checks if the list has as many processed WorkItems as we
// expect to see overall. If so the list is returned to the next block, if not we return an empty array, which passes nothing on.
// This isn't threadsafe for the list, so has to be called with MaxDegreeOfParallelization = 1
private IEnumerable<List<WorkItem>> WaitForWorkItems(WorkItem workItem)
{
List<WorkItem> itemList = workItem.ProcessedWorkItems;
itemList.Add(workItem);
return itemList.Count == GetNumberOfFiles(workItem.Year, workItem.Month) ? new[] { itemList } : new List<WorkItem>[0];
}
private List<WorkItem> MergeWorkItemData(List<WorkItem> processedWorkItems)
{
string finalContents = "";
foreach (WorkItem workItem in processedWorkItems)
{
finalContents = MergeData(finalContents, workItem.ProcessedData);
}
// Should really create a new data structure and return that, but let's cheat a bit
processedWorkItems[0].MergedData = finalContents;
return processedWorkItems;
}
// Just concatenate the output strings, separated by newlines, to merge our data
private string MergeData(string output1, string output2) => output1 != "" ? output1 + "\n" + output2 : output2;
private void SaveWorkItemData(List<WorkItem> workItems)
{
WorkItem result = workItems[0];
SaveData(result.MergedData, result.Year, result.Month);
// Code to show it's worked...
Console.WriteLine($"Saved data block for {DateToString((result.Year, result.Month))} on {ThreadAndTime()}." +
$" File contents:\n{result.MergedData}\n");
}
private void SaveData(string finalContents, int year, int month)
{
// Actually save, although don't really need to in this test code
new DirectoryInfo("Results").Create();
File.WriteAllText(Path.Combine("Results", $"results{year}{Pad(month)}.txt"), finalContents);
}
// Helper methods
private string DateToString((int year, int month) date) => date.year + Pad(date.month);
private string Pad(int number) => number < 10 ? "0" + number : number.ToString();
private string ThreadAndTime() => $"thread {Pad(Thread.CurrentThread.ManagedThreadId)} at {DateTime.Now.ToString("hh:mm:ss.fff")}";
}
public class WorkItem
{
public int Year { get; set; }
public int Month { get; set; }
public string FilePath { get; set; }
public string RawData { get; set; }
public string ProcessedData { get; set; }
public List<WorkItem> ProcessedWorkItems { get; set; }
public string MergedData { get; set; }
}
}
This code passes a WorkItem object from each block to the next and enriches it at each stage. It then creates a final list with all the WorkItems for a month in it, before running an aggregation process on that and saving the results.
This code is based on dummy methods for each stage using the names you use. These don't do much but hopefully demonstrate the solution. For example LoadData is handed a file path and just adds some text to it and passes the string on, but obviously it could load a real file and pass the contents string on if there actually was a file on disk.
Similarly to simulate doing work in ProcessData we do a Thread.SpinWait and then again just add some text to the string. This is where the delay comes from, so change the number if you want it to run faster or slower. The code was written on the .NET Framework, but it runs on Core 3.0, and on Ubuntu and OSX. The only difference is that a SpinWait cycle can be significantly longer or shorter, so you may want to play with the delay.
Note that we could have merged in the waitForProcessedDataBlock and had exactly the pipeline you were asking for. It just would have been a bit more confusing
The code does create files on disk at the end, but also dumps the results to the screen, so it doesn't really need to.
If you set parallelization to 1 you'll find it slows down by about the amount you'd expect. My Windows machine is four-core and it's slightly worse than four times slower.
LoadDataBlock1
,LoadDataBlock2
etc will receive only one item to process? OneinputPath
for each block? – AntiparallelLoadDataBlock
s (one seems enough to me), but regarding the essence of your question, and specifically the merging part, have you tried to simplyLinkTo
allProcessDataBlock
s to the singleMergeDataBlock
? – Antiparallelvar LoadDataBlock = Transform....
will only occur once (and likewise withProcessDataBlock
), but it should be run individually for each of the input paths. Does that make sence? – Factual