How do I split and merge this dataflow pipeline?
Asked Answered
F

2

4

I am trying to create a dataflow using tpl with the following form:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    ...                             
                    -> LoadDataBlockN -> ProcessDataBlockN ->

The idea is, that GetInputPathsBlock is a block, which finds the paths to the input data that is to be loaded, and then sends the path to each LoadDataBlock. The LoadDataBlocks are all identical (except that they have each recieved a unique inputPath string from GetInputPaths). The loaded data is then sent to the ProcessDataBlock, which does some simple processing. Then the data from each ProcessDataBlockis sent to MergeDataBlock, which merges it and sends it to SaveDataBlock, which then saves it to a file.

Think of it as a dataflow that needs to run for each month. First the path is found for the data for each day. Each day's data is loaded and processed, and then merged together for the entire month and saved. Each month can be run parallelly, data for each day in a month can be loaded parallelly and processed parallelly (after the individual day data has been loaded), and once everything for the month has been loaded and processed, it can be merged and saved.

What I tried

As far as I can tell TransformManyBlock<TInput,string> can be used to do the splitting (GetInputPathsBlock), and can be linked to a normal TransformBlock<string,InputData> (LoadDataBlock), and from there to another TransformBlock<InputData,ProcessedData> (ProcessDataBlock), but I don't know how to then merge it back to a single block.

What I looked at

I found this answer, which uses TransformManyBlock to go from an IEnumerable<item> to item, but I don't fully understand it, and I can't link a TransformBlock<InputData,ProcessedData> (ProcessDataBlock) to a TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>, so I don't know how to use it.

I have also seen answers like this, which suggests using JoinBlock, but the number of input files N varies, and the files are all loaded in the same way anyway.

There is also this answer, which seems to do what I want, but I don't fully understand it, and I don't know how the setup with the dictionary would be transferred to my case.

How do I split and merge my dataflow?

  • Is there a block type I am missing
  • Can I somehow use TransformManyBlock twice?
  • Does tpl make sense for the split/merge or is there a simpler async/await way?
Factual answered 5/11, 2019 at 15:6 Comment(4)
So each of the LoadDataBlock1, LoadDataBlock2 etc will receive only one item to process? One inputPath for each block?Antiparallel
Exactly. Each of the inputpaths will be sent to its own LoadDataBlockFactual
I would question the need for multiple LoadDataBlocks (one seems enough to me), but regarding the essence of your question, and specifically the merging part, have you tried to simply LinkTo all ProcessDataBlocks to the single MergeDataBlock?Antiparallel
I think we are misunderstanding each other. When I say each inputpath will be sent to its own LoadDataBlock I mean, that the block is defined once, but should be used with each of the inputpaths. The line var LoadDataBlock = Transform.... will only occur once (and likewise with ProcessDataBlock), but it should be run individually for each of the input paths. Does that make sence?Factual
A
3

I would use a nested block to avoid splitting my monthly data and then having to merge them again. Here is an example of two nested TransformBlocks that process all days of the year 2020:

var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
{
    var dailyBlock = new TransformBlock<int, string>(async (day) =>
    {
        await Task.Delay(100); // Simulate async work
        return day.ToString();
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
        await dailyBlock.SendAsync(day);
    dailyBlock.Complete();

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

foreach (var month in Enumerable.Range(1, 12))
    await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();

For collecting the daily results of the inner block I used the extension method ToListAsync that is shown below:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}
Antiparallel answered 7/11, 2019 at 15:25 Comment(4)
Thank you, I think this is what I am looking for, I will try it out. Is there a particular reason you are using MaxDegreeOfParallelism = 1 for the monthlyBlock?Factual
The reason is to avoid parallelizing too much. If for example each daily job is reading a large file from a folder, then reading 4 files concurrently from the same physical disk drive may make the process slower than faster, since the heads of the disk cannot be in 4 places at once. If the drive is SSD then it can be parallelized more. If on the other hand each job is primarily calculations, then the ideal degree of parallelism is the number of the cores of the machine. It depends very much on what you are doing, and the capabilities of the hardware that is used the most.Antiparallel
The total degree of parallelism is the product of the inner and outer block's MaxDegreeOfParallelism. So if the outer is 4 and the inner is 4, the total is 16.Antiparallel
@Littlegeek I posted a more sophisticated answer to a similar question, here.Antiparallel
S
0

The answers to your questions are: no, you don't need another block type, yes, you can use TransformManyBlock twice, and yes, it does make sense.  I wrote some code to prove it, which is at the bottom, and some notes on how it works, which are after that.

The code uses a split then merge pipeline as you describe.  As for the bit you were struggling with:  merging the data for individual files back together can be done by adding processed items to a list as they become available.  Then we only pass the list on to the next block if it has the expected final number of items.   This can be done with a fairly simple TransformMany block returning zero or one items.  This block can't be parallelized because the list isn't threadsafe.

Once you've got a pipeline like this you can test the parallelization and ordering by just using the options passed to the blocks.  The code below sets parallelization to unbounded for every block it can, and lets the DataFlow code sort it out.  On my machine it maxes out all the cores/logical processors and is CPU-bound, which is what we want.  Ordering is enabled, but turning that off doesn't make much difference: again, we are CPU-bound.

Finally I have to say this is a very cool tech, but you can actually solve this problem much more simply using PLINQ, where it's just a few lines of code to get something just as fast.  The big drawback is that you can't easily incrementally add fast-arriving messages to a pipeline if you do that: PLINQ is better-suited to one big batch process.  However PLINQ may be a better solution for your usecase.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ParallelDataFlow
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().Run();
            Console.ReadLine();
        }

        private void Run()
        {
            Stopwatch s = new Stopwatch();
            s.Start();

            // Can  experiment with parallelization of blocks by changing MaxDegreeOfParallelism
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
            var getInputPathsBlock = new TransformManyBlock<(int, int), WorkItem>(date => GetWorkItemWithInputPath(date), options);
            var loadDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => LoadDataIntoWorkItem(workItem), options);
            var processDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => ProcessDataForWorkItem(workItem), options);
            var waitForProcessedDataBlock = new TransformManyBlock<WorkItem, List<WorkItem>>(workItem => WaitForWorkItems(workItem));  // Can't parallelize this block
            var mergeDataBlock = new TransformBlock<List<WorkItem>, List<WorkItem>>(list => MergeWorkItemData(list), options);
            var saveDataBlock = new ActionBlock<List<WorkItem>>(list => SaveWorkItemData(list), options);

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            getInputPathsBlock.LinkTo(loadDataBlock, linkOptions);
            loadDataBlock.LinkTo(processDataBlock, linkOptions);
            processDataBlock.LinkTo(waitForProcessedDataBlock, linkOptions);
            waitForProcessedDataBlock.LinkTo(mergeDataBlock, linkOptions);
            mergeDataBlock.LinkTo(saveDataBlock, linkOptions);

            // We post individual tuples of (year, month) to our pipeline, as many as we want
            getInputPathsBlock.Post((1903, 2));  // Post one month and date
            var dates = from y in Enumerable.Range(2015, 5) from m in Enumerable.Range(1, 12) select (y, m);
            foreach (var date in dates) getInputPathsBlock.Post(date);  // Post a big sequence         

            getInputPathsBlock.Complete();
            saveDataBlock.Completion.Wait();
            s.Stop();
            Console.WriteLine($"Completed in {s.ElapsedMilliseconds}ms on {ThreadAndTime()}");
        }

        private IEnumerable<WorkItem> GetWorkItemWithInputPath((int year, int month) date)
        {
            List<WorkItem> processedWorkItems = new List<WorkItem>();  // Will store merged results
            return GetInputPaths(date.year, date.month).Select(
                path => new WorkItem
                {
                    Year = date.year,
                    Month = date.month,
                    FilePath = path,
                    ProcessedWorkItems = processedWorkItems
                });
        }

        // Get filepaths of form e.g. Files/20191101.txt  These aren't real files, they just show how it could work.
        private IEnumerable<string> GetInputPaths(int year, int month) =>
            Enumerable.Range(0, GetNumberOfFiles(year, month)).Select(i => $@"Files/{year}{Pad(month)}{Pad(i + 1)}.txt");

        private int GetNumberOfFiles(int year, int month) => DateTime.DaysInMonth(year, month);

        private WorkItem LoadDataIntoWorkItem(WorkItem workItem) {
            workItem.RawData = LoadData(workItem.FilePath);
            return workItem;
        }

        // Simulate loading by just concatenating to path: in real code this could open a real file and return the contents
        private string LoadData(string path) => "This is content from file " + path;

        private WorkItem ProcessDataForWorkItem(WorkItem workItem)
        {
            workItem.ProcessedData = ProcessData(workItem.RawData);
            return workItem;
        }

        private string ProcessData(string contents)
        {
            Thread.SpinWait(11000000); // Use 11,000,000 for ~50ms on Windows .NET Framework.  1,100,000 on Windows .NET Core.
            return $"Results of processing file with contents '{contents}' on {ThreadAndTime()}";
        }

        // Adds a processed WorkItem to its ProcessedWorkItems list.  Then checks if the list has as many processed WorkItems as we 
        // expect to see overall.  If so the list is returned to the next block, if not we return an empty array, which passes nothing on.
        // This isn't threadsafe for the list, so has to be called with MaxDegreeOfParallelization = 1
        private IEnumerable<List<WorkItem>> WaitForWorkItems(WorkItem workItem)
        {
            List<WorkItem> itemList = workItem.ProcessedWorkItems;
            itemList.Add(workItem);
            return itemList.Count == GetNumberOfFiles(workItem.Year, workItem.Month) ? new[] { itemList } : new List<WorkItem>[0];
        }

        private List<WorkItem> MergeWorkItemData(List<WorkItem> processedWorkItems)
        {
            string finalContents = "";
            foreach (WorkItem workItem in processedWorkItems)
            {
                finalContents = MergeData(finalContents, workItem.ProcessedData);
            }
            // Should really create a new data structure and return that, but let's cheat a bit
            processedWorkItems[0].MergedData = finalContents;
            return processedWorkItems;
        }

        // Just concatenate the output strings, separated by newlines, to merge our data
        private string MergeData(string output1, string output2) => output1 != "" ? output1 + "\n" + output2 : output2;

        private void SaveWorkItemData(List<WorkItem> workItems)
        {
            WorkItem result = workItems[0];
            SaveData(result.MergedData, result.Year, result.Month);
            // Code to show it's worked...
            Console.WriteLine($"Saved data block for {DateToString((result.Year, result.Month))} on {ThreadAndTime()}." +
                              $"  File contents:\n{result.MergedData}\n");
        }
        private void SaveData(string finalContents, int year, int month)
        {
            // Actually save, although don't really need to in this test code
            new DirectoryInfo("Results").Create();
            File.WriteAllText(Path.Combine("Results", $"results{year}{Pad(month)}.txt"), finalContents);
        }

        // Helper methods
        private string DateToString((int year, int month) date) => date.year + Pad(date.month);
        private string Pad(int number) => number < 10 ? "0" + number : number.ToString();
        private string ThreadAndTime() => $"thread {Pad(Thread.CurrentThread.ManagedThreadId)} at {DateTime.Now.ToString("hh:mm:ss.fff")}";
    }

    public class WorkItem
    {
        public int Year { get; set; }
        public int Month { get; set; }
        public string FilePath { get; set; }
        public string RawData { get; set; }
        public string ProcessedData { get; set; }
        public List<WorkItem> ProcessedWorkItems { get; set; }
        public string MergedData { get; set; }
    }
}

This code passes a WorkItem object from each block to the next and enriches it at each stage.  It then creates a final list with all the WorkItems for a month in it, before running an aggregation process on that and saving the results.

This code is based on dummy methods for each stage using the names you use.  These don't do much but hopefully demonstrate the solution.  For example LoadData is handed a file path and just adds some text to it and passes the string on, but obviously it could load a real file and pass the contents string on if there actually was a file on disk. 

Similarly to simulate doing work in ProcessData we do a Thread.SpinWait and then again just add some text to the string.  This is where the delay comes from, so change the number if you want it to run faster or slower.  The code was written on the .NET Framework, but it runs on Core 3.0, and on Ubuntu and OSX.  The only difference is that a SpinWait cycle can be significantly longer or shorter, so you may want to play with the delay.

Note that we could have merged in the waitForProcessedDataBlock and had exactly the pipeline you were asking for. It just would have been a bit more confusing

The code does create files on disk at the end, but also dumps the results to the screen, so it doesn't really need to.

If you set parallelization to 1 you'll find it slows down by about the amount you'd expect.  My Windows machine is four-core and it's slightly worse than four times slower.

Spermatogonium answered 14/11, 2019 at 22:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.