TPL Dataflow - Parallel&Async processing, while keeping order
Asked Answered
I

3

5

I created a TPL Dataflow pipeline which consist of 3 TransformBlock's and an ActionBlock at the end.

var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

Each block will fill the Job-object with the processed data, since i not only need the data itself but also general information, that i need for the response messages. I pretty much add a path to an XML and get a Response-object with information if everything went right.

How can i achieve so that if two or more files come in that take some time to read from HDD, it reads both files parallel and async, while keeping the order they came in? If file1 takes much more time, file 2 would need to wait for file1 to finish before i pass the data over to the next Block and then it will also start validating the data parallel and async, but also here keeps the order for the next block?

Right now it looks like even if i call SendAsync to the headblock, it will sequentially process all files.

EDIT: So i wrote a little test class for my purpose of the pipeline. It has 3 stages. What i want to achieve is the first TransformBlock to keep reading in files as they come in (SendAsync from a FileSystemWatcher) and output it when done in order they came in. Means if File1 is a large file and File2+3 comes in, both will be read in, while File1 is still being processed, but File2+3 will have to wait until it can get send to the second TransformBlock, because File1 is still being read in. Stage2 should work just the same. Stage3 on the other hand needs to take objects generated from File1 and save into to database, which can be done parallel and async. However, objects from file1 need to be processed before file2 and file3. So the file contents as a whole need to be processed sequantially in order they came in. I tried doing that by limiting the 3rd TransformBlock with MaxDegreeOfParallelism and BoundedCapacity both set to 1, but this seems to fail and not really keep the order in the Console.WriteLine's

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing
{
    public class Job
    {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test
    {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job)
        {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job)
        {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job)
        {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO =>
            {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o)
        {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job)
        {
            if(job.ReturnCode == 100)
            {
                Console.WriteLine("ID {0} was successfully imported.", job.ID);

            }
            else
            {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline()
        {
            var loadXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ReadDocument(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ValidateXml(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ProcessJob(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, ActionBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            importJob.LinkTo(reportImport);

            // Return the head of the network.
            return loadXml;
        }

        public void Start()
        {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id)
        {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel()
        {
            if(cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program
    {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args)
        {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach(var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}

EDIT2: After i made one change by setting BoundedCapacity to Unbounded, i got everything in the order it was send into the pipe. So it wasn't really out of order before, but messages where discarded i guess?

If i make sure that EnsureOrdered is true as well as using MaxDegreeOfParallelism of 8 in the last TransformBlock, items are not in order anymore if you check the piece of the output below. But this is where it needs to be in order, since im saving data to the database, which needs to be in the order it came in. It's not really important if its not in order when it leaves the last TransformBlock, so i guess i can't keep parallelism here?

ValidateXml 08:27:24.2855461 | Thread 21 is processing Job Id: 36
ValidateXml 08:27:24.2855461 | Thread 28 is processing Job Id: 37
+++ ProcessJob 08:27:24.2880490 | Thread 33 is processing Job Id: 9
ReadDocument 08:27:24.2855461 | Thread 6 is processing Job Id: 56
ValidateXml 08:27:25.2853094 | Thread 19 is processing Job Id: 38
ReadDocument 08:27:25.2853094 | Thread 13 is processing Job Id: 58
+++ ProcessJob 08:27:25.2868091 | Thread 34 is processing Job Id: 13
ReadDocument 08:27:25.2858087 | Thread 16 is processing Job Id: 59
+++ ProcessJob 08:27:25.2858087 | Thread 25 is processing Job Id: 10
+++ ProcessJob 08:27:25.2858087 | Thread 29 is processing Job Id: 12
ReadDocument 08:27:25.2853094 | Thread 11 is processing Job Id: 57
ReadDocument 08:27:25.2873097 | Thread 15 is processing Job Id: 60
ValidateXml 08:27:25.2853094 | Thread 22 is processing Job Id: 40
ValidateXml 08:27:25.2853094 | Thread 23 is processing Job Id: 39
+++ ProcessJob 08:27:25.2858087 | Thread 30 is processing Job Id: 11
ValidateXml 08:27:26.2865381 | Thread 21 is processing Job Id: 41
ReadDocument 08:27:26.2865381 | Thread 14 is processing Job Id: 61
ValidateXml 08:27:26.2865381 | Thread 20 is processing Job Id: 42
ValidateXml 08:27:26.2865381 | Thread 26 is processing Job Id: 43
ReadDocument 08:27:26.2865381 | Thread 17 is processing Job Id: 62
ReadDocument 08:27:26.2870374 | Thread 12 is processing Job Id: 63
+++ ProcessJob 08:27:26.2870374 | Thread 24 is processing Job Id: 14

EDIT3: The output after using @JSteward latest code.

ReadDocument 09:01:03.9363340 JobId: 1
ReadDocument 09:01:03.9368357 JobId: 5
ReadDocument 09:01:03.9373347 JobId: 6
ReadDocument 09:01:03.9368357 JobId: 8
ReadDocument 09:01:03.9363340 JobId: 4
ReadDocument 09:01:03.9373347 JobId: 3
ReadDocument 09:01:03.9373347 JobId: 7
ReadDocument 09:01:03.9368357 JobId: 2
ReadDocument 09:01:05.2037570 JobId: 9
ReadDocument 09:01:05.3108413 JobId: 10
ReadDocument 09:01:05.5678177 JobId: 11
ReadDocument 09:01:05.6308763 JobId: 12
ValidateXml 09:01:05.6338782 JobId: 1
ValidateXml 09:01:06.3754174 JobId: 2
ReadDocument 09:01:06.3764184 JobId: 13
ReadDocument 09:01:06.3764184 JobId: 14
ReadDocument 09:01:07.3756634 JobId: 15
ReadDocument 09:01:07.3756634 JobId: 18
ValidateXml 09:01:07.3756634 JobId: 3
ValidateXml 09:01:07.3756634 JobId: 4
ReadDocument 09:01:07.3756634 JobId: 17
ReadDocument 09:01:07.3756634 JobId: 16
ReadDocument 09:01:08.3753887 JobId: 19
ReadDocument 09:01:08.3753887 JobId: 20
ValidateXml 09:01:08.3753887 JobId: 5
ProcessJob 09:01:08.3763906 JobId: 1
ReadDocument 09:01:09.3744411 JobId: 21
ReadDocument 09:01:09.3749410 JobId: 24
ProcessJob 09:01:09.3749410 JobId: 2
ReadDocument 09:01:09.3749410 JobId: 22
ReadDocument 09:01:09.3749410 JobId: 23
ReadDocument 09:01:10.3752061 JobId: 25
ReadDocument 09:01:10.3752061 JobId: 27
ValidateXml 09:01:10.3752061 JobId: 6
ValidateXml 09:01:10.3752061 JobId: 7
ValidateXml 09:01:10.3752061 JobId: 8
ReadDocument 09:01:10.3752061 JobId: 26
ReadDocument 09:01:11.3759294 JobId: 29
ReadDocument 09:01:11.3759294 JobId: 28
ValidateXml 09:01:11.3764278 JobId: 10
ReadDocument 09:01:11.3759294 JobId: 31
ValidateXml 09:01:11.3759294 JobId: 9
ReadDocument 09:01:11.3759294 JobId: 30
ValidateXml 09:01:12.3751553 JobId: 11
ReadDocument 09:01:12.3751553 JobId: 33
ValidateXml 09:01:12.3751553 JobId: 12
ReadDocument 09:01:12.3751553 JobId: 34
ReadDocument 09:01:12.3751553 JobId: 32
ValidateXml 09:01:13.3753842 JobId: 13
ValidateXml 09:01:13.3753842 JobId: 14
ValidateXml 09:01:13.3753842 JobId: 16
ReadDocument 09:01:13.3753842 JobId: 35
ReadDocument 09:01:13.3753842 JobId: 36
ValidateXml 09:01:13.3753842 JobId: 15
ReadDocument 09:01:14.3756414 JobId: 37
ValidateXml 09:01:14.3756414 JobId: 19
ValidateXml 09:01:14.3756414 JobId: 18
ValidateXml 09:01:14.3756414 JobId: 17
ReadDocument 09:01:14.3756414 JobId: 40
ReadDocument 09:01:14.3756414 JobId: 38
ReadDocument 09:01:14.3756414 JobId: 39
ProcessJob 09:01:14.3761419 JobId: 3
SendToDataBase 09:01:14.3806453 JobId: 1
SendToDataBase 09:01:14.3821472 JobId: 2
ProcessJob 09:01:14.3821472 JobId: 4
ValidateXml 09:01:15.3763758 JobId: 20
ReadDocument 09:01:15.3763758 JobId: 42
ValidateXml 09:01:15.3763758 JobId: 21
ReadDocument 09:01:15.3773772 JobId: 43
ReadDocument 09:01:15.3763758 JobId: 41
ValidateXml 09:01:15.3768800 JobId: 22
ReadDocument 09:01:15.3773772 JobId: 44
ValidateXml 09:01:16.3761117 JobId: 23
ValidateXml 09:01:16.3761117 JobId: 26
ValidateXml 09:01:16.3761117 JobId: 24
ValidateXml 09:01:16.3761117 JobId: 25
ReadDocument 09:01:16.3761117 JobId: 45
ReadDocument 09:01:16.3761117 JobId: 46
ProcessJob 09:01:16.3761117 JobId: 5
ReadDocument 09:01:17.3758334 JobId: 47
ValidateXml 09:01:17.3763315 JobId: 28
ValidateXml 09:01:17.3763315 JobId: 27
ReadDocument 09:01:17.3763315 JobId: 49
ReadDocument 09:01:17.3763315 JobId: 48
ProcessJob 09:01:17.3763315 JobId: 6
ValidateXml 09:01:17.3763315 JobId: 29
ReadDocument 09:01:17.3763315 JobId: 50
ReadDocument 09:01:18.3755786 JobId: 51
ReadDocument 09:01:18.3755786 JobId: 52
<<<
ProcessJob 09:01:18.3770792 JobId: 10
ProcessJob 09:01:18.3770792 JobId: 9
ProcessJob 09:01:18.3755786 JobId: 7
>>>
ReadDocument 09:01:18.3755786 JobId: 53
ValidateXml 09:01:18.3755786 JobId: 32
ValidateXml 09:01:18.3755786 JobId: 31
ValidateXml 09:01:18.3755786 JobId: 30
ReadDocument 09:01:18.3760794 JobId: 54
ProcessJob 09:01:18.3755786 JobId: 8
ValidateXml 09:01:19.3753274 JobId: 34
ValidateXml 09:01:19.3753274 JobId: 33
ReadDocument 09:01:19.3758261 JobId: 56
ReadDocument 09:01:19.3758261 JobId: 55
ValidateXml 09:01:19.3758261 JobId: 35
ValidateXml 09:01:20.3752782 JobId: 36
ValidateXml 09:01:20.3752782 JobId: 37
ProcessJob 09:01:20.3757709 JobId: 11
ReadDocument 09:01:20.3752782 JobId: 57
ValidateXml 09:01:20.3752782 JobId: 38
ReadDocument 09:01:20.3757709 JobId: 58
ReadDocument 09:01:20.3757709 JobId: 59
ProcessJob 09:01:21.3757202 JobId: 12
ValidateXml 09:01:21.3757202 JobId: 39
ReadDocument 09:01:21.3757202 JobId: 62
ReadDocument 09:01:21.3757202 JobId: 61
ReadDocument 09:01:21.3757202 JobId: 60
ReadDocument 09:01:22.3764154 JobId: 63
ReadDocument 09:01:22.3764154 JobId: 64
ReadDocument 09:01:22.3764154 JobId: 65
ProcessJob 09:01:22.3794167 JobId: 16
ValidateXml 09:01:22.3764154 JobId: 40
ValidateXml 09:01:22.3764154 JobId: 42
ReadDocument 09:01:22.3764154 JobId: 66
ValidateXml 09:01:22.3774149 JobId: 43
ProcessJob 09:01:22.3764154 JobId: 13
ValidateXml 09:01:22.3764154 JobId: 41
ProcessJob 09:01:22.3779160 JobId: 15
SendToDataBase 09:01:22.3784159 JobId: 3
ProcessJob 09:01:22.3764154 JobId: 14
ValidateXml 09:01:22.3859209 JobId: 44
SendToDataBase 09:01:22.4309993 JobId: 4
SendToDataBase 09:01:22.4460051 JobId: 5
SendToDataBase 09:01:22.4465047 JobId: 6
ReadDocument 09:01:23.3760112 JobId: 67
ValidateXml 09:01:23.3760112 JobId: 46
ValidateXml 09:01:23.3760112 JobId: 47
ReadDocument 09:01:23.3760112 JobId: 68
ValidateXml 09:01:23.3760112 JobId: 45
ProcessJob 09:01:23.3760112 JobId: 17
ValidateXml 09:01:24.3762581 JobId: 48
ReadDocument 09:01:24.3762581 JobId: 69
ProcessJob 09:01:24.3762581 JobId: 18
ProcessJob 09:01:24.3762581 JobId: 19
ReadDocument 09:01:24.3762581 JobId: 70
CreateResponse 09:01:24.3777606 JobId: 58
CreateResponse 09:01:24.3994684 JobId: 59
CreateResponse 09:01:24.4059908 JobId: 60
CreateResponse 09:01:24.4114777 JobId: 61
CreateResponse 09:01:24.4134789 JobId: 62
ValidateXml 09:01:25.3759607 JobId: 49
ValidateXml 09:01:25.3759607 JobId: 51
ProcessJob 09:01:25.3784627 JobId: 22
ValidateXml 09:01:25.3759607 JobId: 52
ProcessJob 09:01:25.3759607 JobId: 20
ValidateXml 09:01:25.3774629 JobId: 53
ValidateXml 09:01:25.3759607 JobId: 50
ValidateXml 09:01:25.3774629 JobId: 54
ReadDocument 09:01:25.3759607 JobId: 72
ReadDocument 09:01:25.3774629 JobId: 73
ReadDocument 09:01:25.3759607 JobId: 71
ReadDocument 09:01:25.3779625 JobId: 74
ProcessJob 09:01:25.3759607 JobId: 21
SendToDataBase 09:01:25.3774629 JobId: 7
CreateResponse 09:01:25.3759607 JobId: 39
SendToDataBase 09:01:25.4398495 JobId: 8
SendToDataBase 09:01:25.4448555 JobId: 9
SendToDataBase 09:01:25.4478565 JobId: 10
SendToDataBase 09:01:25.4483570 JobId: 11
CreateResponse 09:01:25.4448555 JobId: 42
CreateResponse 09:01:25.4608868 JobId: 43
SendToDataBase 09:01:25.4553682 JobId: 12
CreateResponse 09:01:25.4613665 JobId: 44
CreateResponse 09:01:25.4698849 JobId: 45
ReadDocument 09:01:26.3754874 JobId: 75
ReadDocument 09:01:26.3754874 JobId: 76
ReadDocument 09:01:26.3754874 JobId: 78
ValidateXml 09:01:26.3754874 JobId: 55
ProcessJob 09:01:26.3759876 JobId: 24
ProcessJob 09:01:26.3754874 JobId: 23
ReadDocument 09:01:26.3754874 JobId: 77
SendToDataBase 09:01:26.3759876 JobId: 13
SendToDataBase 09:01:26.3980055 JobId: 14
SendToDataBase 09:01:26.3985045 JobId: 15
SendToDataBase 09:01:26.4020099 JobId: 16
ReadDocument 09:01:27.3762164 JobId: 79
ValidateXml 09:01:27.3762164 JobId: 56
ProcessJob 09:01:27.3762164 JobId: 26
ReadDocument 09:01:27.3762164 JobId: 82
ProcessJob 09:01:27.3762164 JobId: 25
ReadDocument 09:01:27.3762164 JobId: 81
ReadDocument 09:01:27.3762164 JobId: 80
ValidateXml 09:01:27.3762164 JobId: 63
ValidateXml 09:01:27.3777165 JobId: 64
ProcessJob 09:01:27.3767157 JobId: 27
ValidateXml 09:01:27.3762164 JobId: 57
SendToDataBase 09:01:27.3777165 JobId: 17
SendToDataBase 09:01:27.4327571 JobId: 18
SendToDataBase 09:01:27.4357587 JobId: 19
ReadDocument 09:01:28.3761410 JobId: 83
ProcessJob 09:01:28.3761410 JobId: 28
ProcessJob 09:01:28.3761410 JobId: 29
ValidateXml 09:01:28.3761410 JobId: 66
SendToDataBase 09:01:28.3761410 JobId: 20
ProcessJob 09:01:28.3761410 JobId: 30
ValidateXml 09:01:28.3761410 JobId: 67
ValidateXml 09:01:28.3761410 JobId: 65
SendToDataBase 09:01:28.3861483 JobId: 21
SendToDataBase 09:01:28.4141687 JobId: 22
ReadDocument 09:01:28.6079764 JobId: 84
ReadDocument 09:01:28.6552491 JobId: 85
ReadDocument 09:01:28.7047606 JobId: 86
ValidateXml 09:01:28.7327861 JobId: 68
ProcessJob 09:01:28.7327861 JobId: 31
ReadDocument 09:01:29.1285484 JobId: 87
ProcessJob 09:01:29.1894672 JobId: 32
SendToDataBase 09:01:29.1894672 JobId: 23
SendToDataBase 09:01:29.1944706 JobId: 24
ReadDocument 09:01:29.3910070 JobId: 88
ValidateXml 09:01:29.5569691 JobId: 69
ReadDocument 09:01:29.5995036 JobId: 89
ValidateXml 09:01:29.6085095 JobId: 70
ReadDocument 09:01:29.6581266 JobId: 90
ValidateXml 09:01:29.8797899 JobId: 71
ValidateXml 09:01:30.1244519 JobId: 72
ValidateXml 09:01:30.1584763 JobId: 73
ReadDocument 09:01:30.2100312 JobId: 91
ProcessJob 09:01:30.2490536 JobId: 33
ProcessJob 09:01:30.2950865 JobId: 34
ReadDocument 09:01:30.3290995 JobId: 92
ProcessJob 09:01:30.3636350 JobId: 35
SendToDataBase 09:01:30.3636350 JobId: 25
SendToDataBase 09:01:30.3701300 JobId: 26
SendToDataBase 09:01:30.3706299 JobId: 27
ProcessJob 09:01:30.4987430 JobId: 36
ReadDocument 09:01:30.5642707 JobId: 93
ReadDocument 09:01:30.6088035 JobId: 94
ValidateXml 09:01:30.7213868 JobId: 74
ReadDocument 09:01:30.7544106 JobId: 95
ProcessJob 09:01:30.7544106 JobId: 37
SendToDataBase 09:01:30.7544106 JobId: 28
ProcessJob 09:01:31.1091681 JobId: 38
SendToDataBase 09:01:31.1091681 JobId: 29
SendToDataBase 09:01:31.1151730 JobId: 30
ValidateXml 09:01:31.2012468 JobId: 75
ValidateXml 09:01:31.2827940 JobId: 76
ValidateXml 09:01:31.3143168 JobId: 77
ValidateXml 09:01:31.4073842 JobId: 78
ReadDocument 09:01:31.4369059 JobId: 96
ReadDocument 09:01:31.4699302 JobId: 97
ProcessJob 09:01:31.7201123 JobId: 40
SendToDataBase 09:01:31.7201123 JobId: 31
ProcessJob 09:01:32.1569310 JobId: 41
SendToDataBase 09:01:32.1569310 JobId: 32
ValidateXml 09:01:32.3650822 JobId: 79
ValidateXml 09:01:32.3650822 JobId: 80
ProcessJob 09:01:32.3966047 JobId: 46
ReadDocument 09:01:32.4236247 JobId: 98
ReadDocument 09:01:32.4831869 JobId: 99
ValidateXml 09:01:32.5607342 JobId: 81
ReadDocument 09:01:32.5777363 JobId: 100
ProcessJob 09:01:33.1461630 JobId: 47
ProcessJob 09:01:33.2081967 JobId: 48
SendToDataBase 09:01:33.2081967 JobId: 33
SendToDataBase 09:01:33.2137015 JobId: 34
SendToDataBase 09:01:33.2172021 JobId: 35
ValidateXml 09:01:33.2347146 JobId: 82
ValidateXml 09:01:33.4228519 JobId: 83
ProcessJob 09:01:33.4228519 JobId: 49
ValidateXml 09:01:33.4373638 JobId: 84
ProcessJob 09:01:33.4878995 JobId: 50
SendToDataBase 09:01:33.4878995 JobId: 36
ProcessJob 09:01:33.5819674 JobId: 51
ValidateXml 09:01:33.6239992 JobId: 85
ProcessJob 09:01:33.6239992 JobId: 52
SendToDataBase 09:01:33.6239992 JobId: 37
SendToDataBase 09:01:33.6295082 JobId: 38
ValidateXml 09:01:33.6870563 JobId: 86
ValidateXml 09:01:33.7125626 JobId: 87
ProcessJob 09:01:34.1238635 JobId: 53
ProcessJob 09:01:34.5796949 JobId: 54
<<<
SendToDataBase 09:01:34.5796949 JobId: 40
SendToDataBase 09:01:34.5856995 JobId: 41
SendToDataBase 09:01:34.5887008 JobId: 46
>>>
ValidateXml 09:01:34.7951688 JobId: 88
ValidateXml 09:01:34.9162007 JobId: 89
ProcessJob 09:01:34.9541705 JobId: 55
ValidateXml 09:01:35.0464443 JobId: 90
ProcessJob 09:01:35.3634898 JobId: 56
ProcessJob 09:01:35.3795024 JobId: 57
ValidateXml 09:01:35.5165095 JobId: 91
ValidateXml 09:01:35.8614345 JobId: 92
ProcessJob 09:01:35.9985415 JobId: 63
ValidateXml 09:01:36.0481807 JobId: 93
ProcessJob 09:01:36.0763064 JobId: 64
ProcessJob 09:01:36.0993229 JobId: 65
SendToDataBase 09:01:36.0993229 JobId: 47
SendToDataBase 09:01:36.1048270 JobId: 48
ValidateXml 09:01:36.1572079 JobId: 94
ValidateXml 09:01:36.3791015 JobId: 95
ProcessJob 09:01:36.4212607 JobId: 66
SendToDataBase 09:01:36.4212607 JobId: 49
SendToDataBase 09:01:36.4267655 JobId: 50
SendToDataBase 09:01:36.4272654 JobId: 51
SendToDataBase 09:01:36.4322913 JobId: 52
SendToDataBase 09:01:36.4327837 JobId: 53
ProcessJob 09:01:36.5149796 JobId: 67
SendToDataBase 09:01:36.5149796 JobId: 54
ValidateXml 09:01:36.6861048 JobId: 96
ValidateXml 09:01:36.7845716 JobId: 97
ValidateXml 09:01:37.0175979 JobId: 98
ValidateXml 09:01:37.3788835 JobId: 99
ValidateXml 09:01:37.6477046 JobId: 100
ProcessJob 09:01:37.8269808 JobId: 68
SendToDataBase 09:01:37.8269808 JobId: 55
ProcessJob 09:01:37.8940108 JobId: 69
ProcessJob 09:01:38.2955556 JobId: 70
ProcessJob 09:01:38.3110583 JobId: 71
SendToDataBase 09:01:38.3110583 JobId: 56
SendToDataBase 09:01:38.3125586 JobId: 57
CreateResponse 09:01:38.4551538 JobId: 95
CreateResponse 09:01:38.4925304 JobId: 96
ProcessJob 09:01:38.5382532 JobId: 72
ProcessJob 09:01:38.9129894 JobId: 73
SendToDataBase 09:01:38.9129894 JobId: 63
SendToDataBase 09:01:38.9185062 JobId: 64
SendToDataBase 09:01:38.9189949 JobId: 65
ProcessJob 09:01:38.9852121 JobId: 74
ProcessJob 09:01:39.0317458 JobId: 75
SendToDataBase 09:01:39.0317458 JobId: 66
SendToDataBase 09:01:39.0377511 JobId: 67
ProcessJob 09:01:39.6129381 JobId: 76
SendToDataBase 09:01:39.6129381 JobId: 68
ProcessJob 09:01:39.7833004 JobId: 77
SendToDataBase 09:01:39.7833004 JobId: 69
ProcessJob 09:01:39.8740443 JobId: 78
ProcessJob 09:01:40.3145731 JobId: 79
SendToDataBase 09:01:40.3145731 JobId: 70
SendToDataBase 09:01:40.3205708 JobId: 71
ProcessJob 09:01:40.4912084 JobId: 80
ProcessJob 09:01:40.5307205 JobId: 81
SendToDataBase 09:01:40.5317212 JobId: 72
ProcessJob 09:01:40.5652454 JobId: 82
ProcessJob 09:01:41.2902736 JobId: 83
ProcessJob 09:01:41.2902736 JobId: 84
ProcessJob 09:01:41.3598244 JobId: 85
SendToDataBase 09:01:41.3598244 JobId: 73
SendToDataBase 09:01:41.3663284 JobId: 74
SendToDataBase 09:01:41.3713317 JobId: 75
SendToDataBase 09:01:41.3718392 JobId: 76
SendToDataBase 09:01:41.3723328 JobId: 77
ProcessJob 09:01:42.2677493 JobId: 86
SendToDataBase 09:01:42.2677493 JobId: 78
ProcessJob 09:01:42.6466081 JobId: 87
ProcessJob 09:01:42.8947969 JobId: 88
SendToDataBase 09:01:42.8947969 JobId: 79
ProcessJob 09:01:43.0012509 JobId: 89
ProcessJob 09:01:43.1513589 JobId: 90
ProcessJob 09:01:43.4545800 JobId: 91
SendToDataBase 09:01:43.4545800 JobId: 80
SendToDataBase 09:01:43.4600832 JobId: 81
SendToDataBase 09:01:43.4605919 JobId: 82
ProcessJob 09:01:43.5946813 JobId: 92
ProcessJob 09:01:44.1731027 JobId: 93
SendToDataBase 09:01:44.1731027 JobId: 83
SendToDataBase 09:01:44.1786068 JobId: 84
SendToDataBase 09:01:44.1816090 JobId: 85
ProcessJob 09:01:44.4678171 JobId: 94
SendToDataBase 09:01:44.4678171 JobId: 86
ProcessJob 09:01:45.3426043 JobId: 97
SendToDataBase 09:01:45.3426043 JobId: 87
ProcessJob 09:01:45.3751270 JobId: 98
ProcessJob 09:01:45.7363757 JobId: 99
ProcessJob 09:01:45.7809216 JobId: 100
SendToDataBase 09:01:45.7809216 JobId: 88
SendToDataBase 09:01:45.7879270 JobId: 89
SendToDataBase 09:01:45.7925566 JobId: 90
SendToDataBase 09:01:45.8776726 JobId: 91
SendToDataBase 09:01:45.8776726 JobId: 92
SendToDataBase 09:01:46.5813640 JobId: 93
SendToDataBase 09:01:46.5813640 JobId: 94
SendToDataBase 09:01:47.7407165 JobId: 97
SendToDataBase 09:01:47.7407165 JobId: 98
SendToDataBase 09:01:48.4382058 JobId: 99
SendToDataBase 09:01:48.7357557 JobId: 100
Indign answered 13/1, 2017 at 11:35 Comment(0)
S
2

Original answer body became too long

Edit4: response to OP Edit2 I'm not certain exactly what changes were made to produce the provided output but here is your modified source and the results displaying ordered behavior for all 100 inputs.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing {
    public class Job {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job) {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job) {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job) {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO => {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o) {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job) {
            if (job.ReturnCode == 100) {
                Console.WriteLine($"CreateResponse {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            }
            else {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline() {
            var loadXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ReadDocument(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ValidateXml(job)) {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job => {
                try {
                    if (ProcessJob(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            //importJob.LinkTo(reportImport);

            var output = importJob.AsObservable();
            var subscription = output.Subscribe(x => {
            if (x.ReturnCode == 100) {
                //job success
                Console.WriteLine($"SendToDataBase {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }
            else {
                //handle fault
                Console.WriteLine($"Job Failed {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }                
        });

            // Return the head of the network.
            return loadXml;
        }

        public void Start() {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id) {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel() {
            if (cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args) {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach (var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}

Results

Function,Timestamp,Other,JobId
ReadDocument,08:11:27.2200011,JobId:,1
ReadDocument,08:11:27.2240007,JobId:,2
ReadDocument,08:11:29.7562763,JobId:,3
ReadDocument,08:11:29.7662792,JobId:,4
ReadDocument,08:11:30.7013793,JobId:,5
ReadDocument,08:11:31.7024931,JobId:,6
ReadDocument,08:11:31.7034925,JobId:,7
ReadDocument,08:11:32.7306060,JobId:,9
ReadDocument,08:11:32.7306060,JobId:,8
ReadDocument,08:11:33.7027033,JobId:,10
ReadDocument,08:11:33.7027033,JobId:,11
ReadDocument,08:11:34.7018217,JobId:,12
ReadDocument,08:11:34.7028153,JobId:,13
ReadDocument,08:11:35.7019214,JobId:,14
ReadDocument,08:11:35.7069235,JobId:,15
ReadDocument,08:11:35.7069235,JobId:,16
ReadDocument,08:11:35.7069235,JobId:,17
ReadDocument,08:11:35.7079221,JobId:,18
ValidateXml,08:11:35.7119363,JobId:,1
ValidateXml,08:11:36.7060334,JobId:,2
ReadDocument,08:11:36.7060334,JobId:,19
ReadDocument,08:11:36.7070332,JobId:,20
ReadDocument,08:11:37.7071383,JobId:,21
ReadDocument,08:11:37.7071383,JobId:,22
ReadDocument,08:11:37.7081392,JobId:,23
ValidateXml,08:11:37.7091421,JobId:,3
ReadDocument,08:11:38.7032496,JobId:,24
ValidateXml,08:11:38.7052496,JobId:,6
ValidateXml,08:11:38.7042513,JobId:,4
ReadDocument,08:11:38.7052496,JobId:,27
ValidateXml,08:11:38.7042513,JobId:,5
ReadDocument,08:11:38.7052496,JobId:,28
ReadDocument,08:11:38.7042513,JobId:,26
ReadDocument,08:11:38.7032496,JobId:,25
ValidateXml,08:11:39.7023545,JobId:,7
ReadDocument,08:11:39.7023545,JobId:,29
ValidateXml,08:11:39.7023545,JobId:,8
ReadDocument,08:11:40.7064634,JobId:,30
ReadDocument,08:11:40.7064634,JobId:,31
ValidateXml,08:11:40.7084642,JobId:,9
ValidateXml,08:11:41.7045755,JobId:,10
ReadDocument,08:11:41.7085762,JobId:,33
ValidateXml,08:11:41.7105750,JobId:,11
ValidateXml,08:11:41.7115767,JobId:,12
ValidateXml,08:11:41.7135740,JobId:,13
ValidateXml,08:11:41.7155790,JobId:,14
ReadDocument,08:11:41.7085762,JobId:,34
ReadDocument,08:11:41.7045755,JobId:,32
ReadDocument,08:11:41.7105750,JobId:,35
ReadDocument,08:11:41.7135740,JobId:,36
ReadDocument,08:11:42.7086844,JobId:,37
ValidateXml,08:11:42.7116926,JobId:,15
ValidateXml,08:11:42.7126878,JobId:,16
ReadDocument,08:11:42.7116926,JobId:,38
ValidateXml,08:11:43.7027911,JobId:,17
ValidateXml,08:11:43.7027911,JobId:,18
ValidateXml,08:11:43.7068030,JobId:,20
ProcessJob,08:11:43.7097908,JobId:,1
ValidateXml,08:11:43.7057897,JobId:,19
ReadDocument,08:11:43.7057897,JobId:,39
ReadDocument,08:11:43.7077893,JobId:,40
ReadDocument,08:11:44.7038990,JobId:,41
ProcessJob,08:11:44.7059002,JobId:,2
ValidateXml,08:11:44.7049004,JobId:,21
ReadDocument,08:11:44.7038990,JobId:,42
ValidateXml,08:11:44.7059002,JobId:,22
ReadDocument,08:11:44.7089023,JobId:,44
ReadDocument,08:11:44.7049004,JobId:,43
ReadDocument,08:11:45.7030090,JobId:,45
ValidateXml,08:11:45.7030090,JobId:,23
ValidateXml,08:11:45.7120179,JobId:,24
ValidateXml,08:11:45.7120179,JobId:,25
ReadDocument,08:11:45.7140087,JobId:,46
ValidateXml,08:11:45.7170104,JobId:,26
ReadDocument,08:11:45.7190107,JobId:,47
ProcessJob,08:11:45.7200086,JobId:,3
ValidateXml,08:11:45.7170104,JobId:,27
ReadDocument,08:11:46.7071167,JobId:,48
ReadDocument,08:11:46.7101161,JobId:,50
ProcessJob,08:11:46.7111152,JobId:,4
ValidateXml,08:11:46.7111152,JobId:,28
ReadDocument,08:11:46.7071167,JobId:,49
ValidateXml,08:11:47.7032249,JobId:,29
ReadDocument,08:11:47.7062243,JobId:,51
ReadDocument,08:11:47.7072261,JobId:,52
ReadDocument,08:11:47.7092253,JobId:,53
ProcessJob,08:11:47.7102243,JobId:,5
ProcessJob,08:11:47.7112241,JobId:,7
ReadDocument,08:11:47.7102243,JobId:,55
ValidateXml,08:11:47.7062243,JobId:,30
ProcessJob,08:11:47.7102243,JobId:,6
ValidateXml,08:11:47.7072261,JobId:,31
ReadDocument,08:11:47.7092253,JobId:,54
ReadDocument,08:11:48.7063329,JobId:,56
ProcessJob,08:11:48.7073331,JobId:,8
ValidateXml,08:11:48.7063329,JobId:,32
ValidateXml,08:11:48.7063329,JobId:,33
ValidateXml,08:11:49.7074443,JobId:,34
ReadDocument,08:11:49.7104422,JobId:,59
ReadDocument,08:11:49.7124418,JobId:,60
ProcessJob,08:11:49.7124418,JobId:,9
ValidateXml,08:11:49.7144433,JobId:,36
ValidateXml,08:11:49.7114420,JobId:,35
ReadDocument,08:11:49.7074443,JobId:,57
ReadDocument,08:11:49.7084468,JobId:,58
ValidateXml,08:11:50.7065604,JobId:,37
ReadDocument,08:11:50.7095502,JobId:,61
ProcessJob,08:11:50.7105504,JobId:,10
ReadDocument,08:11:50.7115502,JobId:,63
ValidateXml,08:11:50.7125515,JobId:,40
ReadDocument,08:11:50.7105504,JobId:,62
ValidateXml,08:11:50.7095502,JobId:,39
ValidateXml,08:11:50.7075518,JobId:,38
ReadDocument,08:11:50.7115502,JobId:,64
ReadDocument,08:11:51.7076596,JobId:,65
ReadDocument,08:11:51.7086597,JobId:,66
ProcessJob,08:11:51.7116603,JobId:,13
ProcessJob,08:11:51.7106605,JobId:,12
ProcessJob,08:11:51.7086597,JobId:,11
ValidateXml,08:11:51.7076596,JobId:,41
SendToDataBase,08:11:51.7366672,JobId:,1
SendToDataBase,08:11:51.7416631,JobId:,2
SendToDataBase,08:11:51.7496646,JobId:,3
CreateResponse,08:11:51.7546639,JobId:,56
ValidateXml,08:11:52.7037712,JobId:,42
ValidateXml,08:11:52.7037712,JobId:,43
ValidateXml,08:11:52.7077662,JobId:,44
ReadDocument,08:11:52.7107675,JobId:,69
ProcessJob,08:11:52.7077662,JobId:,14
ProcessJob,08:11:52.7077662,JobId:,15
ProcessJob,08:11:52.7087683,JobId:,16
ProcessJob,08:11:52.7087683,JobId:,17
ValidateXml,08:11:52.7097669,JobId:,45
ReadDocument,08:11:52.7097669,JobId:,67
ValidateXml,08:11:52.7097669,JobId:,46
ReadDocument,08:11:52.7107675,JobId:,68
ValidateXml,08:11:53.7069300,JobId:,47
ReadDocument,08:11:53.7078801,JobId:,70
ValidateXml,08:11:53.7108792,JobId:,48
SendToDataBase,08:11:53.7118774,JobId:,4
SendToDataBase,08:11:53.7208818,JobId:,5
SendToDataBase,08:11:53.7228802,JobId:,6
SendToDataBase,08:11:53.7238781,JobId:,7
SendToDataBase,08:11:53.7258800,JobId:,8
ReadDocument,08:11:53.7118774,JobId:,73
ReadDocument,08:11:53.7098805,JobId:,71
ReadDocument,08:11:53.7118774,JobId:,72
ValidateXml,08:11:54.7059933,JobId:,49
ValidateXml,08:11:54.7069847,JobId:,50
ValidateXml,08:11:54.7089874,JobId:,51
CreateResponse,08:11:54.7109862,JobId:,41
CreateResponse,08:11:54.7169842,JobId:,42
SendToDataBase,08:11:54.7149888,JobId:,9
SendToDataBase,08:11:54.7259874,JobId:,10
SendToDataBase,08:11:54.7269883,JobId:,11
ProcessJob,08:11:54.7119868,JobId:,18
ReadDocument,08:11:54.7059933,JobId:,74
ValidateXml,08:11:54.7109862,JobId:,53
ProcessJob,08:11:54.7119868,JobId:,19
ProcessJob,08:11:54.7129854,JobId:,20
ValidateXml,08:11:54.7099852,JobId:,52
ReadDocument,08:11:54.7129854,JobId:,76
ReadDocument,08:11:54.7069847,JobId:,75
ReadDocument,08:11:55.7090940,JobId:,77
ReadDocument,08:11:55.7140926,JobId:,78
ValidateXml,08:11:55.7140926,JobId:,54
SendToDataBase,08:11:55.7180953,JobId:,12
CreateResponse,08:11:55.7180953,JobId:,43
ProcessJob,08:11:55.7180953,JobId:,21
SendToDataBase,08:11:55.7230962,JobId:,13
ValidateXml,08:11:55.7170947,JobId:,55
ReadDocument,08:11:55.7160937,JobId:,79
ReadDocument,08:11:55.7170947,JobId:,80
ValidateXml,08:11:55.8111031,JobId:,57
ReadDocument,08:11:55.8111031,JobId:,81
ProcessJob,08:11:55.8451120,JobId:,22
ProcessJob,08:11:56.1251577,JobId:,23
ReadDocument,08:11:56.2531569,JobId:,82
ReadDocument,08:11:56.3441756,JobId:,83
ProcessJob,08:11:56.3571695,JobId:,24
ValidateXml,08:11:56.3851785,JobId:,58
ReadDocument,08:11:56.4061804,JobId:,84
ValidateXml,08:11:56.6222012,JobId:,59
CreateResponse,08:11:56.6222012,JobId:,49
ProcessJob,08:11:56.9112320,JobId:,25
ValidateXml,08:11:56.9412405,JobId:,60
ProcessJob,08:11:57.0002533,JobId:,26
ValidateXml,08:11:57.2352587,JobId:,61
ProcessJob,08:11:57.4852908,JobId:,27
ReadDocument,08:11:58.2093656,JobId:,85
SendToDataBase,08:11:58.2163692,JobId:,14
ReadDocument,08:11:58.2113664,JobId:,87
SendToDataBase,08:11:58.2203645,JobId:,15
SendToDataBase,08:11:58.2293743,JobId:,16
SendToDataBase,08:11:58.2303706,JobId:,17
SendToDataBase,08:11:58.2313662,JobId:,18
SendToDataBase,08:11:58.2333692,JobId:,19
SendToDataBase,08:11:58.2353681,JobId:,20
SendToDataBase,08:11:58.2373688,JobId:,21
SendToDataBase,08:11:58.2383671,JobId:,22
SendToDataBase,08:11:58.2393673,JobId:,23
ValidateXml,08:11:58.2123658,JobId:,63
CreateResponse,08:11:58.2163692,JobId:,50
CreateResponse,08:11:58.2543716,JobId:,51
CreateResponse,08:11:58.2643699,JobId:,52
CreateResponse,08:11:58.2663730,JobId:,53
ProcessJob,08:11:58.2143646,JobId:,31
ProcessJob,08:11:58.2123658,JobId:,29
ReadDocument,08:11:58.2093656,JobId:,86
ReadDocument,08:11:58.2123658,JobId:,88
ProcessJob,08:11:58.2133656,JobId:,30
ProcessJob,08:11:58.2103650,JobId:,28
ValidateXml,08:11:58.2113664,JobId:,62
ReadDocument,08:11:58.2123658,JobId:,89
ValidateXml,08:11:58.2133656,JobId:,64
ValidateXml,08:11:59.7055294,JobId:,65
ReadDocument,08:11:59.7065300,JobId:,91
ValidateXml,08:11:59.7065300,JobId:,66
SendToDataBase,08:11:59.7115275,JobId:,24
SendToDataBase,08:11:59.7195324,JobId:,25
SendToDataBase,08:11:59.7205330,JobId:,26
ProcessJob,08:11:59.7085277,JobId:,33
ValidateXml,08:11:59.7085277,JobId:,68
ReadDocument,08:11:59.7095263,JobId:,93
ValidateXml,08:11:59.7085277,JobId:,67
ReadDocument,08:11:59.7095263,JobId:,92
ProcessJob,08:11:59.7095263,JobId:,34
ProcessJob,08:11:59.7075275,JobId:,32
ReadDocument,08:11:59.7055294,JobId:,90
ValidateXml,08:11:59.7105265,JobId:,70
ValidateXml,08:11:59.7095263,JobId:,69
ReadDocument,08:11:59.7105265,JobId:,94
ValidateXml,08:12:00.7146358,JobId:,71
SendToDataBase,08:12:00.7176364,JobId:,27
ReadDocument,08:12:00.7156372,JobId:,97
ProcessJob,08:12:00.7146358,JobId:,35
ProcessJob,08:12:00.7156372,JobId:,36
ReadDocument,08:12:00.7146358,JobId:,95
ReadDocument,08:12:00.7156372,JobId:,96
ReadDocument,08:12:00.8616797,JobId:,98
ValidateXml,08:12:00.8796565,JobId:,72
ReadDocument,08:12:00.9066595,JobId:,99
ReadDocument,08:12:00.9786697,JobId:,100
ValidateXml,08:12:00.9866692,JobId:,73
ProcessJob,08:12:01.0766830,JobId:,37
ValidateXml,08:12:01.1176829,JobId:,74
ProcessJob,08:12:01.1176829,JobId:,38
ProcessJob,08:12:01.2167037,JobId:,39
SendToDataBase,08:12:01.2167037,JobId:,28
SendToDataBase,08:12:01.2216970,JobId:,29
SendToDataBase,08:12:01.2236923,JobId:,30
SendToDataBase,08:12:01.2246914,JobId:,31
ValidateXml,08:12:01.2327001,JobId:,75
ValidateXml,08:12:01.5447286,JobId:,76
ProcessJob,08:12:01.6567738,JobId:,40
ValidateXml,08:12:01.9347686,JobId:,77
ProcessJob,08:12:02.2498041,JobId:,44
ProcessJob,08:12:02.4448257,JobId:,45
SendToDataBase,08:12:02.4458286,JobId:,32
ValidateXml,08:12:02.5469861,JobId:,78
ProcessJob,08:12:02.6268456,JobId:,46
SendToDataBase,08:12:02.6278997,JobId:,33
SendToDataBase,08:12:02.6378977,JobId:,34
SendToDataBase,08:12:02.6398461,JobId:,35
ValidateXml,08:12:02.6538506,JobId:,79
ProcessJob,08:12:03.1399063,JobId:,47
SendToDataBase,08:12:03.1489053,JobId:,36
ValidateXml,08:12:03.2979184,JobId:,80
ProcessJob,08:12:03.4959402,JobId:,48
ValidateXml,08:12:03.6259629,JobId:,81
ValidateXml,08:12:03.6769676,JobId:,82
ProcessJob,08:12:03.7719693,JobId:,54
ProcessJob,08:12:03.8519797,JobId:,55
ProcessJob,08:12:03.9689901,JobId:,57
SendToDataBase,08:12:04.0079945,JobId:,37
SendToDataBase,08:12:04.0099953,JobId:,38
SendToDataBase,08:12:04.0109931,JobId:,39
SendToDataBase,08:12:04.0119941,JobId:,40
ValidateXml,08:12:04.0299989,JobId:,84
ValidateXml,08:12:04.0089966,JobId:,83
ProcessJob,08:12:04.3350372,JobId:,58
ValidateXml,08:12:04.6541474,JobId:,85
ProcessJob,08:12:04.8791864,JobId:,59
SendToDataBase,08:12:04.8791864,JobId:,44
SendToDataBase,08:12:05.0252098,JobId:,45
SendToDataBase,08:12:05.0757198,JobId:,46
ProcessJob,08:12:05.0757198,JobId:,60
ValidateXml,08:12:05.1527328,JobId:,86
ProcessJob,08:12:05.1532325,JobId:,61
ValidateXml,08:12:05.2762716,JobId:,87
ValidateXml,08:12:05.3793706,JobId:,88
ValidateXml,08:12:05.5953056,JobId:,89
ValidateXml,08:12:05.6453136,JobId:,90
ProcessJob,08:12:05.8313378,JobId:,62
SendToDataBase,08:12:05.8313378,JobId:,47
ValidateXml,08:12:06.1573930,JobId:,91
ValidateXml,08:12:06.2043839,JobId:,92
ProcessJob,08:12:06.4384015,JobId:,63
SendToDataBase,08:12:06.4384015,JobId:,48
ProcessJob,08:12:06.6554190,JobId:,64
ProcessJob,08:12:06.7494355,JobId:,65
SendToDataBase,08:12:06.7494355,JobId:,54
SendToDataBase,08:12:06.7594308,JobId:,55
SendToDataBase,08:12:06.7624294,JobId:,57
ProcessJob,08:12:06.9254482,JobId:,66
SendToDataBase,08:12:06.9254482,JobId:,58
ValidateXml,08:12:07.0154624,JobId:,93
ValidateXml,08:12:07.0975086,JobId:,94
ProcessJob,08:12:07.1925138,JobId:,67
ValidateXml,08:12:07.2724877,JobId:,95
ProcessJob,08:12:07.6385268,JobId:,68
ProcessJob,08:12:07.7705429,JobId:,69
ValidateXml,08:12:07.8315476,JobId:,96
ProcessJob,08:12:07.8905526,JobId:,70
SendToDataBase,08:12:07.8905526,JobId:,59
SendToDataBase,08:12:07.8965534,JobId:,60
SendToDataBase,08:12:07.8975535,JobId:,61
ValidateXml,08:12:08.1306009,JobId:,97
ValidateXml,08:12:08.2065895,JobId:,98
ValidateXml,08:12:08.3106332,JobId:,99
ProcessJob,08:12:08.3296082,JobId:,71
ValidateXml,08:12:08.4406159,JobId:,100
ProcessJob,08:12:08.8396557,JobId:,72
SendToDataBase,08:12:08.8446570,JobId:,62
SendToDataBase,08:12:08.8806613,JobId:,63
SendToDataBase,08:12:08.8946619,JobId:,64
ProcessJob,08:12:09.0076746,JobId:,73
SendToDataBase,08:12:09.0086763,JobId:,65
ProcessJob,08:12:09.0996850,JobId:,74
ProcessJob,08:12:09.1106847,JobId:,75
SendToDataBase,08:12:09.1106847,JobId:,66
SendToDataBase,08:12:09.1136860,JobId:,67
ProcessJob,08:12:09.6547630,JobId:,76
SendToDataBase,08:12:09.6557462,JobId:,68
ProcessJob,08:12:09.9218032,JobId:,77
ProcessJob,08:12:10.2218075,JobId:,78
ProcessJob,08:12:10.4288308,JobId:,79
SendToDataBase,08:12:10.4288308,JobId:,69
SendToDataBase,08:12:10.4408307,JobId:,70
SendToDataBase,08:12:10.4448318,JobId:,71
ProcessJob,08:12:10.6858596,JobId:,80
SendToDataBase,08:12:10.6858596,JobId:,72
ProcessJob,08:12:11.4049481,JobId:,81
ProcessJob,08:12:11.7039814,JobId:,82
ProcessJob,08:12:11.8272054,JobId:,83
ProcessJob,08:12:11.9930072,JobId:,84
SendToDataBase,08:12:11.9930072,JobId:,73
SendToDataBase,08:12:11.9979988,JobId:,74
SendToDataBase,08:12:11.9989983,JobId:,75
SendToDataBase,08:12:11.9989983,JobId:,76
ProcessJob,08:12:12.3460366,JobId:,85
ProcessJob,08:12:12.4520491,JobId:,86
SendToDataBase,08:12:12.4520491,JobId:,77
ProcessJob,08:12:12.8810952,JobId:,87
ProcessJob,08:12:13.1443167,JobId:,88
SendToDataBase,08:12:13.1443167,JobId:,78
SendToDataBase,08:12:13.1471282,JobId:,79
ProcessJob,08:12:13.2041414,JobId:,89
SendToDataBase,08:12:13.2081302,JobId:,80
SendToDataBase,08:12:13.2101309,JobId:,81
ProcessJob,08:12:13.4381566,JobId:,90
SendToDataBase,08:12:13.4392215,JobId:,82
ProcessJob,08:12:13.6411889,JobId:,91
SendToDataBase,08:12:13.6411889,JobId:,83
ProcessJob,08:12:13.9472212,JobId:,92
SendToDataBase,08:12:13.9472212,JobId:,84
ProcessJob,08:12:14.3122494,JobId:,93
ProcessJob,08:12:14.7053031,JobId:,94
SendToDataBase,08:12:14.7053031,JobId:,85
SendToDataBase,08:12:14.7092946,JobId:,86
ProcessJob,08:12:14.9393634,JobId:,95
ProcessJob,08:12:15.4103709,JobId:,96
SendToDataBase,08:12:15.4113707,JobId:,87
ProcessJob,08:12:15.9355263,JobId:,97
ProcessJob,08:12:15.9724349,JobId:,98
SendToDataBase,08:12:15.9724349,JobId:,88
SendToDataBase,08:12:15.9774350,JobId:,89
ProcessJob,08:12:15.9724349,JobId:,99
SendToDataBase,08:12:15.9784371,JobId:,90
SendToDataBase,08:12:15.9834330,JobId:,91
ProcessJob,08:12:16.6175125,JobId:,100
SendToDataBase,08:12:16.6175125,JobId:,92
SendToDataBase,08:12:16.6555160,JobId:,93
SendToDataBase,08:12:17.5005984,JobId:,94
SendToDataBase,08:12:17.8846409,JobId:,95
SendToDataBase,08:12:17.8886408,JobId:,96
SendToDataBase,08:12:18.1186677,JobId:,97
SendToDataBase,08:12:18.7557365,JobId:,98
SendToDataBase,08:12:18.7567394,JobId:,99
SendToDataBase,08:12:19.5488221,JobId:,100

Edit The new subscription will either send your items to the Db or handle a faulted job in a way you choose.

Further resources:

Stack Exchange Code Review

Dataflow Source

Syrinx answered 19/1, 2017 at 15:51 Comment(8)
So i used your code and i see two problems. One being that it will pass jobs over to CreateResponse where it shouldn't because of BoundedCapacity being locked to 1 or 32 in the options. Jobs should only go there if the returnCode is not 100. If i set it to Unbounded the outputs for "ProcessJob" are not in order, but SendToDataBase is, which makes me think that it reorders the currently worked on element before returning it from the TransformBlock or waiting until the next element in the order is finished with processing, but the processing itself inside the Block is not in order?Indign
@Indign In your modified code I have redirected your final output to a stream each job is processed in order in the handler of that stream which can be seen in the subscription. Cancellation and faults were not the focus of this question, which is all your return code mapping does. I cannot tell what exactly you are setting to "Unbounded" but if you are setting an ActionBlock with greater than one MaxDegreeOfParallelism to an unbounded capacity it will process your items out of order. Please see the quick mod which handles the return code in the stream.Syrinx
Also: Outputs for ProcessJob are not equivalent to the WriteLine ProcessJob. The TransformBlock will reorder the outputs before passing them to the stream and/or next block. The WriteLine in process job is correctly out of order because the items are processed in that step out of order i.e. in parallel, they are then reordered by that TransformBlock and sent to the next stream/block.Syrinx
So i copied your code 1:1 and pasted the output into another edit. As you can see CreateResponse is being called a few times even though i have no error handling yet and just return true in every function (no errors can happen yet). Still some jobs are passed to the Error-ActionBlock's (CreateResponse). I guess this happens because the TransformBlock has reached the BoundedCapacity of 32 and denies incoming jobs after that. I need to ensure everything gets processed, that is why i put BoundedCapacity for all Blocks to Unbounded.Indign
If you also check the SendtoDataBase line for JobId 40, you will see that the next one is 41 and then 46. All in between are missing. The second issue is, that as you said items are not processed in order inside the ProcessJob-TransformBlock, but i need to make sure they are, because this is where i wanted to save into the database. I could either use the subscription to save to the database or just use the last TransformBlock with MaxDegreeOfParallelism = 1, so everything is processed sequentially?Indign
I guess this happens because the TransformBlock has reached the BoundedCapacity of 32 Correct. I could either use the subscription to save to the database or just use the last ActionBlock with MaxDegreeOfParallelism = 1, so everything is processed sequentially? Should work fine. Note as you've seen you will need a more robust way to handle faults/cancellation than the ReturnCode based predicate.Syrinx
Is there a problem with simply sticking to Unbounded? Can recommend a good source for best practice to handle faults and cancellation?Indign
Designing the system such that it forces you to use unbounded limits your ability to throttle the flow if it becomes necessary. It may be acceptable, in isolated situations though. Here's a couple decent resources to check out Stephen Cleary - Async Producer/Consumer Stephen Toub - InterviewSyrinx
B
5

It is possible to do this if you link a TransformBlock to an ActionBlock.

This is easiest to demonstrate with a compilable console app.

This app processes a sequence of integers, but you could replace the integers with a custom work unit class.

(I modified this code from a utility I wrote which does multithreaded file compression, using a relatively slow LZMA compression algorithm. This utility has to read the input data sequentially from a file, then hand it off in blocks to a queue which processes the data using multiple threads in any order, and finally outputs the compressed blocks it to a queue which has to preserve the original order of the data blocks.)

The sample code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    class Program
    {
        public static void Main()
        {
            var data = Enumerable.Range(1, 100);
            var task = Process(data);

            Console.WriteLine("Waiting for task to complete");
            task.Wait();
            Console.WriteLine("Task complete.");
        }

        public static async Task Process(IEnumerable<int> data)
        {
            var queue = new TransformBlock<int, int>(block => process(block), transformBlockOptions());
            var writer = new ActionBlock<int>(block => write(block), actionBlockOptions());

            queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true });

            await enqueDataToProcessAndAwaitCompletion(data, queue);

            await writer.Completion;
        }

        static int process(int block)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}");
            emulateWorkload();
            return -block;
        }

        static void write(int block)
        {
            Console.WriteLine("Output: " + block);
        }

        static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable<int> data, TransformBlock<int, int> queue)
        {
            await enqueueDataToProcess(data, queue);
            queue.Complete();
        }

        static async Task enqueueDataToProcess(IEnumerable<int> data, ITargetBlock<int> queue)
        {
            foreach (var item in data)
                await queue.SendAsync(item);
        }


        static ExecutionDataflowBlockOptions transformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static Random rng = new Random();
        static object locker = new object();

        static void emulateWorkload()
        {
            int delay;

            lock (locker)
            {
                delay = rng.Next(250, 750);
            }

            Thread.Sleep(delay);
        }
    }
}

The output:

Waiting for task to complete
Thread 8 is processing block 8
Thread 5 is processing block 2
Thread 6 is processing block 6
Thread 4 is processing block 5
Thread 7 is processing block 7
Thread 10 is processing block 4
Thread 9 is processing block 1
Thread 3 is processing block 3
Thread 3 is processing block 9
Thread 8 is processing block 10
Thread 5 is processing block 11
Thread 6 is processing block 12
Thread 9 is processing block 13
Thread 10 is processing block 14
Thread 7 is processing block 15
Thread 8 is processing block 16
Thread 4 is processing block 17
Thread 5 is processing block 18
Thread 3 is processing block 19
Thread 9 is processing block 20
Thread 8 is processing block 21
Output: -1
Output: -2
Output: -3
Output: -4
Output: -5
Output: -6
Output: -7
Output: -8
Output: -9
Output: -10
Output: -11
Output: -12
Output: -13
Thread 6 is processing block 22
Thread 10 is processing block 23
Output: -14
Thread 7 is processing block 24
Output: -15
Output: -16
Thread 6 is processing block 25
Output: -17
Thread 4 is processing block 26
Thread 5 is processing block 27
----------------->SNIP<-----------------
Thread 10 is processing block 93
Thread 8 is processing block 94
Output: -83
Thread 4 is processing block 95
Output: -84
Output: -85
Output: -86
Output: -87
Thread 3 is processing block 96
Output: -88
Thread 6 is processing block 97
Thread 5 is processing block 98
Thread 10 is processing block 99
Thread 9 is processing block 100
Output: -89
Output: -90
Output: -91
Output: -92
Output: -93
Output: -94
Output: -95
Output: -96
Output: -97
Output: -98
Output: -99
Output: -100
Task complete.
Press any key to continue . . .

Note how the "blocks" are processed in any order by multiple threads, but the output order is the same as the input order.

It is very important to set the output action block options as per the actionBlockOptions() method, with MaxDegreeOfParallelism and BoundedCapacity both set to 1.

This is what causes the output to be serialised in the correct order. If you set BoundedCapacity and MaxDegreeOfParallelism to more than 1 for the output, then it could be output in the wrong order.

Blacklist answered 13/1, 2017 at 13:43 Comment(2)
Thank you @Matthew Watson. As far as i understood, i could simply use the ActionBlock to output all files that were read in (in the order they came in) and post it to the same BufferBlock/TransformBlock/ActionBlock combination or do you see a problem in chaining those blocks?Indign
@Indign I think that should work OK. It's similar to how my file compresser code works.Blacklist
S
5

@Matthew Watson has good suggestion I just want to throw in that it's not necessary to limit the final action block with MaxDegreeOfParallelism and BoundedCapacity to 1 unless you're using the Microsoft.Tpl.Dataflow package. The newer and correct one, System.Threading.Tasks.Dataflow adds the property EnsureOrdered to the execution block options. Although this does not seem to be documented in MSDN you can find this property and its use in TPL Dataflow Source.

Here is a sample and test that demonstrates this behavior, changing EnsureOrdered in the execution options to false will cause the test to fail. The default value is true and does not need to be explicitly set for ordered behavior.

Edit: As was pointed out below by @Matthew Watson, while EnsureOrdered keeps things ordered between Propagator Blocks, once in the action block messages can be handled in any order.

Edit2: Note: if the ActionBlock has MaxDegreeOfParllelism and BoundedCapacity set to one, but EnsureOrdered is false, the test will fail and the results will be out of order.

[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock<Message> buffer;
    private TransformBlock<Message, Message> xForm1;
    private ActionBlock<Message> action;
    public IList<Message> OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List<Message>();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 2,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock<Message>();

        xForm1 = new TransformBlock<Message, Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        action = new ActionBlock<Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Output  Id: {x.Id} Value: {x.Value}");

            //this delay will cause the messages to be unordered
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            OutputMessages.Add(x);
        }, options);
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);
        xForm1.LinkTo(action, options);
    }

    public Task PostData(IEnumerable<Message> data) {

        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return action.Completion;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}

Edit: Unfortunately we can't directly access the internal ReorderingBuffer. So an alternative to an ActionBlock with BoundedCapacity and MaxDegreeOfParallelism equal to one would be to link the TransformBlock ordered output to a stream. Note in the code above the delay in the parallel enabled ActionBlock causes the result to be out of ordered but in the code below the delay in the processing of the stream will not disturb the order. Essentially, providing the same behavior as a synchronous ActionBlock and could feed another section of mesh etc.

[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock<Message> buffer;
    private TransformBlock<Message, Message> xForm1;
    private IObservable<Message> output;
    private TaskCompletionSource<bool> areWeDoneYet;
    public IList<Message> OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List<Message>();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 13,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock<Message>();

        xForm1 = new TransformBlock<Message, Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        output = xForm1.AsObservable<Message>();

        areWeDoneYet = new TaskCompletionSource<bool>();
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);

        var subscription = output.Subscribe(msg => {
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            OutputMessages.Add(msg);
        }, () => areWeDoneYet.SetResult(true));            
    }

    public Task<bool> PostData(IEnumerable<Message> data) {            
        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return areWeDoneYet.Task;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}

Edit2: Also, my pipeline should have 3 of those stages, how could i link those? So when the first block is done with the first file, it starts putting data out to the next block, which will work parallel and async again.

This is not driven by how they are linked but in the ExecutionDataflowBlockOptions. With the options shown below, the first block will spool up tasks according to the number of files posted and their given processing time, as they finish they will be output to either the next stage of processing or your failure handling ActionBlock based on your Job.ReturnCode predicate, and the same will follow for the next stages. You can also modify your ActionBlock options to handle multiple success/failures from your TransformBlocks.

var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 10,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };
var loadXml = new TransformBlock<Job, Job>(job => { ... }, options); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }, options); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }, options); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

Edit3 In response to the OP's added source code: Your loosing the ordered behavior in your last transform block by setting the MaxDegreeOfParallelism and BoundedCapcity to 1. Let me be clear do not do that to "ensure order" it's only fighting the library. Here is the relevant snippet from the TransformBlock:

            // If parallelism is employed, we will need to support reordering messages that complete out-of-order.
            // However, a developer can override this with EnsureOrdered == false.
            if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
            {
                _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
            }

Here's a run with 20 data points done with your code modified to use parallelism in final TBlock. Modified to basic csv to review in Excel, i.e. Replace " " => "," :)

Function,TimeStamp/Inserted JobId,Other,Other,Other,Other,Other,Other,Other,JobId From functions
ReadDocument,04:54.0,|,Thread,6,is,processing,Job,Id:,1
ReadDocument,04:54.0,|,Thread,11,is,processing,Job,Id:,2
ReadDocument,04:56.0,|,Thread,13,is,processing,Job,Id:,3
ReadDocument,04:56.0,|,Thread,6,is,processing,Job,Id:,4
ReadDocument,04:57.0,|,Thread,11,is,processing,Job,Id:,5
ReadDocument,04:57.0,|,Thread,14,is,processing,Job,Id:,6
ReadDocument,04:58.0,|,Thread,15,is,processing,Job,Id:,7
ReadDocument,04:58.0,|,Thread,6,is,processing,Job,Id:,8
ReadDocument,04:59.0,|,Thread,11,is,processing,Job,Id:,9
ReadDocument,04:59.0,|,Thread,16,is,processing,Job,Id:,10
ReadDocument,05:00.0,|,Thread,17,is,processing,Job,Id:,12
ReadDocument,05:00.0,|,Thread,15,is,processing,Job,Id:,11
ReadDocument,05:01.0,|,Thread,16,is,processing,Job,Id:,13
ReadDocument,05:01.0,|,Thread,18,is,processing,Job,Id:,14
ReadDocument,05:02.0,|,Thread,15,is,processing,Job,Id:,15
ReadDocument,05:02.0,|,Thread,17,is,processing,Job,Id:,20
ValidateXml,05:02.0,|,Thread,19,is,processing,Job,Id:,1
ReadDocument,05:02.0,|,Thread,14,is,processing,Job,Id:,17
ReadDocument,05:02.0,|,Thread,13,is,processing,Job,Id:,16
ReadDocument,05:02.0,|,Thread,11,is,processing,Job,Id:,18
ReadDocument,05:02.0,|,Thread,6,is,processing,Job,Id:,19
ValidateXml,05:03.0,|,Thread,16,is,processing,Job,Id:,2
ValidateXml,05:03.0,|,Thread,20,is,processing,Job,Id:,3
ValidateXml,05:04.0,|,Thread,11,is,processing,Job,Id:,4
ValidateXml,05:04.0,|,Thread,21,is,processing,Job,Id:,7
ValidateXml,05:04.0,|,Thread,18,is,processing,Job,Id:,5
ValidateXml,05:04.0,|,Thread,15,is,processing,Job,Id:,6
ValidateXml,05:04.5,|,Thread,16,is,processing,Job,Id:,8
ValidateXml,05:04.5,|,Thread,6,is,processing,Job,Id:,9
ValidateXml,05:04.6,|,Thread,19,is,processing,Job,Id:,10
ProcessJob,05:04.6,|,Thread,14,is,processing,Job,Id:,2
ProcessJob,05:04.6,|,Thread,22,is,processing,Job,Id:,1
ValidateXml,05:05.5,|,Thread,18,is,processing,Job,Id:,11
ValidateXml,05:05.6,|,Thread,20,is,processing,Job,Id:,12
ProcessJob,05:05.6,|,Thread,23,is,processing,Job,Id:,3
ValidateXml,05:06.5,|,Thread,6,is,processing,Job,Id:,13
ValidateXml,05:06.5,|,Thread,21,is,processing,Job,Id:,15
ID,1,was,successfully,imported.,,,,,
ValidateXml,05:06.5,|,Thread,16,is,processing,Job,Id:,14
ValidateXml,05:06.8,|,Thread,15,is,processing,Job,Id:,17
ProcessJob,05:06.8,|,Thread,24,is,processing,Job,Id:,4
ValidateXml,05:06.8,|,Thread,11,is,processing,Job,Id:,16
ProcessJob,05:06.8,|,Thread,22,is,processing,Job,Id:,5
ProcessJob,05:07.5,|,Thread,17,is,processing,Job,Id:,6
ProcessJob,05:07.5,|,Thread,25,is,processing,Job,Id:,8
ValidateXml,05:07.5,|,Thread,19,is,processing,Job,Id:,18
ProcessJob,05:07.5,|,Thread,14,is,processing,Job,Id:,7
ValidateXml,05:08.5,|,Thread,16,is,processing,Job,Id:,19
ProcessJob,05:08.5,|,Thread,23,is,processing,Job,Id:,9
ValidateXml,05:08.5,|,Thread,18,is,processing,Job,Id:,20
ProcessJob,05:09.5,|,Thread,19,is,processing,Job,Id:,10
ID,2,was,successfully,imported.,,,,,
ProcessJob,05:09.5,|,Thread,15,is,processing,Job,Id:,11
ID,3,was,successfully,imported.,,,,,
ProcessJob,05:10.6,|,Thread,14,is,processing,Job,Id:,12
ProcessJob,05:10.9,|,Thread,25,is,processing,Job,Id:,13
ProcessJob,05:11.0,|,Thread,24,is,processing,Job,Id:,14
ID,4,was,successfully,imported.,,,,,
ProcessJob,05:11.1,|,Thread,17,is,processing,Job,Id:,15
ProcessJob,05:11.3,|,Thread,22,is,processing,Job,Id:,16
ID,5,was,successfully,imported.,,,,,
ID,6,was,successfully,imported.,,,,,
ID,7,was,successfully,imported.,,,,,
ID,8,was,successfully,imported.,,,,,
ProcessJob,05:11.6,|,Thread,19,is,processing,Job,Id:,17
ProcessJob,05:11.7,|,Thread,23,is,processing,Job,Id:,18
ID,9,was,successfully,imported.,,,,,
ID,10,was,successfully,imported.,,,,,
ProcessJob,05:12.0,|,Thread,14,is,processing,Job,Id:,19
ProcessJob,05:12.4,|,Thread,15,is,processing,Job,Id:,20
ID,11,was,successfully,imported.,,,,,
ID,12,was,successfully,imported.,,,,,
ID,13,was,successfully,imported.,,,,,
ID,14,was,successfully,imported.,,,,,
ID,15,was,successfully,imported.,,,,,
ID,16,was,successfully,imported.,,,,,
ID,17,was,successfully,imported.,,,,,
ID,18,was,successfully,imported.,,,,,
ID,19,was,successfully,imported.,,,,,
ID,20,was,successfully,imported.,,,,,

One final note: functions returning bool for success and mapping exceptions to return codes can be problematic, that is however out of the scope of this question. You can get a lot of good advice on best practices by posting code at Stack Exchange Code Review

Syrinx answered 14/1, 2017 at 18:53 Comment(9)
I tried this but it doesn't actually work (i.e. the results are not in the correct order) if you increase the BoundedCapacity to, say, 8. It seems ok for smaller values of BoundedCapacity but the fact it doesn't work properly for higher values seems to indicate that EnsureOrdered doesn't do exactly what we think it does...Blacklist
@Matthew Watson Great spot, on further inspection it appears EnsureOrdered is only relevant between blocks. The final ActionBlock, or any ActionBlock, ignores the option. A couple of notes: I got the unordered behavior to appear for small values of BoundedCapacity by inserting a delay in the action block. So, although messages leave the TransformBlock in correct order they can be processed by an ActionBlock in any order. Maybe, we could just open up the internal ReorderingBuffer on the tBlock and we'd be all set ;)Syrinx
Thank you @JSteward. Trying to understand right now. 3 questions though. By adding TPL Dataflow from the nuget manager i should select "System.Threading.Tasks.Dataflow" instead of "Microsoft.Tpl.Dataflow", which is adviced to be used in the MSDN, why is that? Also, my pipeline should have 3 of those stages, how could i link those? So when the first block is done with the first file, it starts putting data out to the next block, which will work parallel and async again, while still reading the other files? One last thing, the "Subscribe"-function only takes one argument according to my VS?Indign
@Indign (1) Which package; I see the System package was last updated Nov. 2016, while the Microsoft package was updated in 2014. Where is the recommendation to use the Microsoft package? link link (2) Rx Subscribe; you'll ned to install the relevant Rx packages to use the correct overload for the subscribe. I'll try to answer your other questions in an editSyrinx
(1) I was refering to this link. It says you need Microsoft.Tpl.Dataflow, which confused me. (2) Got it, thanks!Indign
@Indign check out this SO question on dataflow packagesSyrinx
So regarding your edit, wouldn't i need to actually Post/SendAsync to a second and third BufferBlock/TransformBlock-construct (For stage 2 and 3) from within the Subscribe method and not use the LinkTo-function? Because if i would just link i would lose the order?Indign
@Indign No, between Propagator blocks order would be maintained as long as it is never broken. Each propagator block assigns an internal id as each item enters and reorders the items as they complete processing. So given a pipeline of three TransformBlocks; the order that items are posted to the input of the first one will be the order they are posted to the output of the last one. All that's needed to ensure order beyond the end of the pipeline is ordered processing, either by a synchronous ActionBlock, i.e. capcity & parllelism =1, or a stream handling the output value in turn.Syrinx
Let us continue this discussion in chat.Indign
S
2

Original answer body became too long

Edit4: response to OP Edit2 I'm not certain exactly what changes were made to produce the provided output but here is your modified source and the results displaying ordered behavior for all 100 inputs.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing {
    public class Job {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job) {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job) {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job) {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO => {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o) {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job) {
            if (job.ReturnCode == 100) {
                Console.WriteLine($"CreateResponse {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            }
            else {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline() {
            var loadXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ReadDocument(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ValidateXml(job)) {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job => {
                try {
                    if (ProcessJob(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            //importJob.LinkTo(reportImport);

            var output = importJob.AsObservable();
            var subscription = output.Subscribe(x => {
            if (x.ReturnCode == 100) {
                //job success
                Console.WriteLine($"SendToDataBase {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }
            else {
                //handle fault
                Console.WriteLine($"Job Failed {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }                
        });

            // Return the head of the network.
            return loadXml;
        }

        public void Start() {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id) {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel() {
            if (cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args) {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach (var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}

Results

Function,Timestamp,Other,JobId
ReadDocument,08:11:27.2200011,JobId:,1
ReadDocument,08:11:27.2240007,JobId:,2
ReadDocument,08:11:29.7562763,JobId:,3
ReadDocument,08:11:29.7662792,JobId:,4
ReadDocument,08:11:30.7013793,JobId:,5
ReadDocument,08:11:31.7024931,JobId:,6
ReadDocument,08:11:31.7034925,JobId:,7
ReadDocument,08:11:32.7306060,JobId:,9
ReadDocument,08:11:32.7306060,JobId:,8
ReadDocument,08:11:33.7027033,JobId:,10
ReadDocument,08:11:33.7027033,JobId:,11
ReadDocument,08:11:34.7018217,JobId:,12
ReadDocument,08:11:34.7028153,JobId:,13
ReadDocument,08:11:35.7019214,JobId:,14
ReadDocument,08:11:35.7069235,JobId:,15
ReadDocument,08:11:35.7069235,JobId:,16
ReadDocument,08:11:35.7069235,JobId:,17
ReadDocument,08:11:35.7079221,JobId:,18
ValidateXml,08:11:35.7119363,JobId:,1
ValidateXml,08:11:36.7060334,JobId:,2
ReadDocument,08:11:36.7060334,JobId:,19
ReadDocument,08:11:36.7070332,JobId:,20
ReadDocument,08:11:37.7071383,JobId:,21
ReadDocument,08:11:37.7071383,JobId:,22
ReadDocument,08:11:37.7081392,JobId:,23
ValidateXml,08:11:37.7091421,JobId:,3
ReadDocument,08:11:38.7032496,JobId:,24
ValidateXml,08:11:38.7052496,JobId:,6
ValidateXml,08:11:38.7042513,JobId:,4
ReadDocument,08:11:38.7052496,JobId:,27
ValidateXml,08:11:38.7042513,JobId:,5
ReadDocument,08:11:38.7052496,JobId:,28
ReadDocument,08:11:38.7042513,JobId:,26
ReadDocument,08:11:38.7032496,JobId:,25
ValidateXml,08:11:39.7023545,JobId:,7
ReadDocument,08:11:39.7023545,JobId:,29
ValidateXml,08:11:39.7023545,JobId:,8
ReadDocument,08:11:40.7064634,JobId:,30
ReadDocument,08:11:40.7064634,JobId:,31
ValidateXml,08:11:40.7084642,JobId:,9
ValidateXml,08:11:41.7045755,JobId:,10
ReadDocument,08:11:41.7085762,JobId:,33
ValidateXml,08:11:41.7105750,JobId:,11
ValidateXml,08:11:41.7115767,JobId:,12
ValidateXml,08:11:41.7135740,JobId:,13
ValidateXml,08:11:41.7155790,JobId:,14
ReadDocument,08:11:41.7085762,JobId:,34
ReadDocument,08:11:41.7045755,JobId:,32
ReadDocument,08:11:41.7105750,JobId:,35
ReadDocument,08:11:41.7135740,JobId:,36
ReadDocument,08:11:42.7086844,JobId:,37
ValidateXml,08:11:42.7116926,JobId:,15
ValidateXml,08:11:42.7126878,JobId:,16
ReadDocument,08:11:42.7116926,JobId:,38
ValidateXml,08:11:43.7027911,JobId:,17
ValidateXml,08:11:43.7027911,JobId:,18
ValidateXml,08:11:43.7068030,JobId:,20
ProcessJob,08:11:43.7097908,JobId:,1
ValidateXml,08:11:43.7057897,JobId:,19
ReadDocument,08:11:43.7057897,JobId:,39
ReadDocument,08:11:43.7077893,JobId:,40
ReadDocument,08:11:44.7038990,JobId:,41
ProcessJob,08:11:44.7059002,JobId:,2
ValidateXml,08:11:44.7049004,JobId:,21
ReadDocument,08:11:44.7038990,JobId:,42
ValidateXml,08:11:44.7059002,JobId:,22
ReadDocument,08:11:44.7089023,JobId:,44
ReadDocument,08:11:44.7049004,JobId:,43
ReadDocument,08:11:45.7030090,JobId:,45
ValidateXml,08:11:45.7030090,JobId:,23
ValidateXml,08:11:45.7120179,JobId:,24
ValidateXml,08:11:45.7120179,JobId:,25
ReadDocument,08:11:45.7140087,JobId:,46
ValidateXml,08:11:45.7170104,JobId:,26
ReadDocument,08:11:45.7190107,JobId:,47
ProcessJob,08:11:45.7200086,JobId:,3
ValidateXml,08:11:45.7170104,JobId:,27
ReadDocument,08:11:46.7071167,JobId:,48
ReadDocument,08:11:46.7101161,JobId:,50
ProcessJob,08:11:46.7111152,JobId:,4
ValidateXml,08:11:46.7111152,JobId:,28
ReadDocument,08:11:46.7071167,JobId:,49
ValidateXml,08:11:47.7032249,JobId:,29
ReadDocument,08:11:47.7062243,JobId:,51
ReadDocument,08:11:47.7072261,JobId:,52
ReadDocument,08:11:47.7092253,JobId:,53
ProcessJob,08:11:47.7102243,JobId:,5
ProcessJob,08:11:47.7112241,JobId:,7
ReadDocument,08:11:47.7102243,JobId:,55
ValidateXml,08:11:47.7062243,JobId:,30
ProcessJob,08:11:47.7102243,JobId:,6
ValidateXml,08:11:47.7072261,JobId:,31
ReadDocument,08:11:47.7092253,JobId:,54
ReadDocument,08:11:48.7063329,JobId:,56
ProcessJob,08:11:48.7073331,JobId:,8
ValidateXml,08:11:48.7063329,JobId:,32
ValidateXml,08:11:48.7063329,JobId:,33
ValidateXml,08:11:49.7074443,JobId:,34
ReadDocument,08:11:49.7104422,JobId:,59
ReadDocument,08:11:49.7124418,JobId:,60
ProcessJob,08:11:49.7124418,JobId:,9
ValidateXml,08:11:49.7144433,JobId:,36
ValidateXml,08:11:49.7114420,JobId:,35
ReadDocument,08:11:49.7074443,JobId:,57
ReadDocument,08:11:49.7084468,JobId:,58
ValidateXml,08:11:50.7065604,JobId:,37
ReadDocument,08:11:50.7095502,JobId:,61
ProcessJob,08:11:50.7105504,JobId:,10
ReadDocument,08:11:50.7115502,JobId:,63
ValidateXml,08:11:50.7125515,JobId:,40
ReadDocument,08:11:50.7105504,JobId:,62
ValidateXml,08:11:50.7095502,JobId:,39
ValidateXml,08:11:50.7075518,JobId:,38
ReadDocument,08:11:50.7115502,JobId:,64
ReadDocument,08:11:51.7076596,JobId:,65
ReadDocument,08:11:51.7086597,JobId:,66
ProcessJob,08:11:51.7116603,JobId:,13
ProcessJob,08:11:51.7106605,JobId:,12
ProcessJob,08:11:51.7086597,JobId:,11
ValidateXml,08:11:51.7076596,JobId:,41
SendToDataBase,08:11:51.7366672,JobId:,1
SendToDataBase,08:11:51.7416631,JobId:,2
SendToDataBase,08:11:51.7496646,JobId:,3
CreateResponse,08:11:51.7546639,JobId:,56
ValidateXml,08:11:52.7037712,JobId:,42
ValidateXml,08:11:52.7037712,JobId:,43
ValidateXml,08:11:52.7077662,JobId:,44
ReadDocument,08:11:52.7107675,JobId:,69
ProcessJob,08:11:52.7077662,JobId:,14
ProcessJob,08:11:52.7077662,JobId:,15
ProcessJob,08:11:52.7087683,JobId:,16
ProcessJob,08:11:52.7087683,JobId:,17
ValidateXml,08:11:52.7097669,JobId:,45
ReadDocument,08:11:52.7097669,JobId:,67
ValidateXml,08:11:52.7097669,JobId:,46
ReadDocument,08:11:52.7107675,JobId:,68
ValidateXml,08:11:53.7069300,JobId:,47
ReadDocument,08:11:53.7078801,JobId:,70
ValidateXml,08:11:53.7108792,JobId:,48
SendToDataBase,08:11:53.7118774,JobId:,4
SendToDataBase,08:11:53.7208818,JobId:,5
SendToDataBase,08:11:53.7228802,JobId:,6
SendToDataBase,08:11:53.7238781,JobId:,7
SendToDataBase,08:11:53.7258800,JobId:,8
ReadDocument,08:11:53.7118774,JobId:,73
ReadDocument,08:11:53.7098805,JobId:,71
ReadDocument,08:11:53.7118774,JobId:,72
ValidateXml,08:11:54.7059933,JobId:,49
ValidateXml,08:11:54.7069847,JobId:,50
ValidateXml,08:11:54.7089874,JobId:,51
CreateResponse,08:11:54.7109862,JobId:,41
CreateResponse,08:11:54.7169842,JobId:,42
SendToDataBase,08:11:54.7149888,JobId:,9
SendToDataBase,08:11:54.7259874,JobId:,10
SendToDataBase,08:11:54.7269883,JobId:,11
ProcessJob,08:11:54.7119868,JobId:,18
ReadDocument,08:11:54.7059933,JobId:,74
ValidateXml,08:11:54.7109862,JobId:,53
ProcessJob,08:11:54.7119868,JobId:,19
ProcessJob,08:11:54.7129854,JobId:,20
ValidateXml,08:11:54.7099852,JobId:,52
ReadDocument,08:11:54.7129854,JobId:,76
ReadDocument,08:11:54.7069847,JobId:,75
ReadDocument,08:11:55.7090940,JobId:,77
ReadDocument,08:11:55.7140926,JobId:,78
ValidateXml,08:11:55.7140926,JobId:,54
SendToDataBase,08:11:55.7180953,JobId:,12
CreateResponse,08:11:55.7180953,JobId:,43
ProcessJob,08:11:55.7180953,JobId:,21
SendToDataBase,08:11:55.7230962,JobId:,13
ValidateXml,08:11:55.7170947,JobId:,55
ReadDocument,08:11:55.7160937,JobId:,79
ReadDocument,08:11:55.7170947,JobId:,80
ValidateXml,08:11:55.8111031,JobId:,57
ReadDocument,08:11:55.8111031,JobId:,81
ProcessJob,08:11:55.8451120,JobId:,22
ProcessJob,08:11:56.1251577,JobId:,23
ReadDocument,08:11:56.2531569,JobId:,82
ReadDocument,08:11:56.3441756,JobId:,83
ProcessJob,08:11:56.3571695,JobId:,24
ValidateXml,08:11:56.3851785,JobId:,58
ReadDocument,08:11:56.4061804,JobId:,84
ValidateXml,08:11:56.6222012,JobId:,59
CreateResponse,08:11:56.6222012,JobId:,49
ProcessJob,08:11:56.9112320,JobId:,25
ValidateXml,08:11:56.9412405,JobId:,60
ProcessJob,08:11:57.0002533,JobId:,26
ValidateXml,08:11:57.2352587,JobId:,61
ProcessJob,08:11:57.4852908,JobId:,27
ReadDocument,08:11:58.2093656,JobId:,85
SendToDataBase,08:11:58.2163692,JobId:,14
ReadDocument,08:11:58.2113664,JobId:,87
SendToDataBase,08:11:58.2203645,JobId:,15
SendToDataBase,08:11:58.2293743,JobId:,16
SendToDataBase,08:11:58.2303706,JobId:,17
SendToDataBase,08:11:58.2313662,JobId:,18
SendToDataBase,08:11:58.2333692,JobId:,19
SendToDataBase,08:11:58.2353681,JobId:,20
SendToDataBase,08:11:58.2373688,JobId:,21
SendToDataBase,08:11:58.2383671,JobId:,22
SendToDataBase,08:11:58.2393673,JobId:,23
ValidateXml,08:11:58.2123658,JobId:,63
CreateResponse,08:11:58.2163692,JobId:,50
CreateResponse,08:11:58.2543716,JobId:,51
CreateResponse,08:11:58.2643699,JobId:,52
CreateResponse,08:11:58.2663730,JobId:,53
ProcessJob,08:11:58.2143646,JobId:,31
ProcessJob,08:11:58.2123658,JobId:,29
ReadDocument,08:11:58.2093656,JobId:,86
ReadDocument,08:11:58.2123658,JobId:,88
ProcessJob,08:11:58.2133656,JobId:,30
ProcessJob,08:11:58.2103650,JobId:,28
ValidateXml,08:11:58.2113664,JobId:,62
ReadDocument,08:11:58.2123658,JobId:,89
ValidateXml,08:11:58.2133656,JobId:,64
ValidateXml,08:11:59.7055294,JobId:,65
ReadDocument,08:11:59.7065300,JobId:,91
ValidateXml,08:11:59.7065300,JobId:,66
SendToDataBase,08:11:59.7115275,JobId:,24
SendToDataBase,08:11:59.7195324,JobId:,25
SendToDataBase,08:11:59.7205330,JobId:,26
ProcessJob,08:11:59.7085277,JobId:,33
ValidateXml,08:11:59.7085277,JobId:,68
ReadDocument,08:11:59.7095263,JobId:,93
ValidateXml,08:11:59.7085277,JobId:,67
ReadDocument,08:11:59.7095263,JobId:,92
ProcessJob,08:11:59.7095263,JobId:,34
ProcessJob,08:11:59.7075275,JobId:,32
ReadDocument,08:11:59.7055294,JobId:,90
ValidateXml,08:11:59.7105265,JobId:,70
ValidateXml,08:11:59.7095263,JobId:,69
ReadDocument,08:11:59.7105265,JobId:,94
ValidateXml,08:12:00.7146358,JobId:,71
SendToDataBase,08:12:00.7176364,JobId:,27
ReadDocument,08:12:00.7156372,JobId:,97
ProcessJob,08:12:00.7146358,JobId:,35
ProcessJob,08:12:00.7156372,JobId:,36
ReadDocument,08:12:00.7146358,JobId:,95
ReadDocument,08:12:00.7156372,JobId:,96
ReadDocument,08:12:00.8616797,JobId:,98
ValidateXml,08:12:00.8796565,JobId:,72
ReadDocument,08:12:00.9066595,JobId:,99
ReadDocument,08:12:00.9786697,JobId:,100
ValidateXml,08:12:00.9866692,JobId:,73
ProcessJob,08:12:01.0766830,JobId:,37
ValidateXml,08:12:01.1176829,JobId:,74
ProcessJob,08:12:01.1176829,JobId:,38
ProcessJob,08:12:01.2167037,JobId:,39
SendToDataBase,08:12:01.2167037,JobId:,28
SendToDataBase,08:12:01.2216970,JobId:,29
SendToDataBase,08:12:01.2236923,JobId:,30
SendToDataBase,08:12:01.2246914,JobId:,31
ValidateXml,08:12:01.2327001,JobId:,75
ValidateXml,08:12:01.5447286,JobId:,76
ProcessJob,08:12:01.6567738,JobId:,40
ValidateXml,08:12:01.9347686,JobId:,77
ProcessJob,08:12:02.2498041,JobId:,44
ProcessJob,08:12:02.4448257,JobId:,45
SendToDataBase,08:12:02.4458286,JobId:,32
ValidateXml,08:12:02.5469861,JobId:,78
ProcessJob,08:12:02.6268456,JobId:,46
SendToDataBase,08:12:02.6278997,JobId:,33
SendToDataBase,08:12:02.6378977,JobId:,34
SendToDataBase,08:12:02.6398461,JobId:,35
ValidateXml,08:12:02.6538506,JobId:,79
ProcessJob,08:12:03.1399063,JobId:,47
SendToDataBase,08:12:03.1489053,JobId:,36
ValidateXml,08:12:03.2979184,JobId:,80
ProcessJob,08:12:03.4959402,JobId:,48
ValidateXml,08:12:03.6259629,JobId:,81
ValidateXml,08:12:03.6769676,JobId:,82
ProcessJob,08:12:03.7719693,JobId:,54
ProcessJob,08:12:03.8519797,JobId:,55
ProcessJob,08:12:03.9689901,JobId:,57
SendToDataBase,08:12:04.0079945,JobId:,37
SendToDataBase,08:12:04.0099953,JobId:,38
SendToDataBase,08:12:04.0109931,JobId:,39
SendToDataBase,08:12:04.0119941,JobId:,40
ValidateXml,08:12:04.0299989,JobId:,84
ValidateXml,08:12:04.0089966,JobId:,83
ProcessJob,08:12:04.3350372,JobId:,58
ValidateXml,08:12:04.6541474,JobId:,85
ProcessJob,08:12:04.8791864,JobId:,59
SendToDataBase,08:12:04.8791864,JobId:,44
SendToDataBase,08:12:05.0252098,JobId:,45
SendToDataBase,08:12:05.0757198,JobId:,46
ProcessJob,08:12:05.0757198,JobId:,60
ValidateXml,08:12:05.1527328,JobId:,86
ProcessJob,08:12:05.1532325,JobId:,61
ValidateXml,08:12:05.2762716,JobId:,87
ValidateXml,08:12:05.3793706,JobId:,88
ValidateXml,08:12:05.5953056,JobId:,89
ValidateXml,08:12:05.6453136,JobId:,90
ProcessJob,08:12:05.8313378,JobId:,62
SendToDataBase,08:12:05.8313378,JobId:,47
ValidateXml,08:12:06.1573930,JobId:,91
ValidateXml,08:12:06.2043839,JobId:,92
ProcessJob,08:12:06.4384015,JobId:,63
SendToDataBase,08:12:06.4384015,JobId:,48
ProcessJob,08:12:06.6554190,JobId:,64
ProcessJob,08:12:06.7494355,JobId:,65
SendToDataBase,08:12:06.7494355,JobId:,54
SendToDataBase,08:12:06.7594308,JobId:,55
SendToDataBase,08:12:06.7624294,JobId:,57
ProcessJob,08:12:06.9254482,JobId:,66
SendToDataBase,08:12:06.9254482,JobId:,58
ValidateXml,08:12:07.0154624,JobId:,93
ValidateXml,08:12:07.0975086,JobId:,94
ProcessJob,08:12:07.1925138,JobId:,67
ValidateXml,08:12:07.2724877,JobId:,95
ProcessJob,08:12:07.6385268,JobId:,68
ProcessJob,08:12:07.7705429,JobId:,69
ValidateXml,08:12:07.8315476,JobId:,96
ProcessJob,08:12:07.8905526,JobId:,70
SendToDataBase,08:12:07.8905526,JobId:,59
SendToDataBase,08:12:07.8965534,JobId:,60
SendToDataBase,08:12:07.8975535,JobId:,61
ValidateXml,08:12:08.1306009,JobId:,97
ValidateXml,08:12:08.2065895,JobId:,98
ValidateXml,08:12:08.3106332,JobId:,99
ProcessJob,08:12:08.3296082,JobId:,71
ValidateXml,08:12:08.4406159,JobId:,100
ProcessJob,08:12:08.8396557,JobId:,72
SendToDataBase,08:12:08.8446570,JobId:,62
SendToDataBase,08:12:08.8806613,JobId:,63
SendToDataBase,08:12:08.8946619,JobId:,64
ProcessJob,08:12:09.0076746,JobId:,73
SendToDataBase,08:12:09.0086763,JobId:,65
ProcessJob,08:12:09.0996850,JobId:,74
ProcessJob,08:12:09.1106847,JobId:,75
SendToDataBase,08:12:09.1106847,JobId:,66
SendToDataBase,08:12:09.1136860,JobId:,67
ProcessJob,08:12:09.6547630,JobId:,76
SendToDataBase,08:12:09.6557462,JobId:,68
ProcessJob,08:12:09.9218032,JobId:,77
ProcessJob,08:12:10.2218075,JobId:,78
ProcessJob,08:12:10.4288308,JobId:,79
SendToDataBase,08:12:10.4288308,JobId:,69
SendToDataBase,08:12:10.4408307,JobId:,70
SendToDataBase,08:12:10.4448318,JobId:,71
ProcessJob,08:12:10.6858596,JobId:,80
SendToDataBase,08:12:10.6858596,JobId:,72
ProcessJob,08:12:11.4049481,JobId:,81
ProcessJob,08:12:11.7039814,JobId:,82
ProcessJob,08:12:11.8272054,JobId:,83
ProcessJob,08:12:11.9930072,JobId:,84
SendToDataBase,08:12:11.9930072,JobId:,73
SendToDataBase,08:12:11.9979988,JobId:,74
SendToDataBase,08:12:11.9989983,JobId:,75
SendToDataBase,08:12:11.9989983,JobId:,76
ProcessJob,08:12:12.3460366,JobId:,85
ProcessJob,08:12:12.4520491,JobId:,86
SendToDataBase,08:12:12.4520491,JobId:,77
ProcessJob,08:12:12.8810952,JobId:,87
ProcessJob,08:12:13.1443167,JobId:,88
SendToDataBase,08:12:13.1443167,JobId:,78
SendToDataBase,08:12:13.1471282,JobId:,79
ProcessJob,08:12:13.2041414,JobId:,89
SendToDataBase,08:12:13.2081302,JobId:,80
SendToDataBase,08:12:13.2101309,JobId:,81
ProcessJob,08:12:13.4381566,JobId:,90
SendToDataBase,08:12:13.4392215,JobId:,82
ProcessJob,08:12:13.6411889,JobId:,91
SendToDataBase,08:12:13.6411889,JobId:,83
ProcessJob,08:12:13.9472212,JobId:,92
SendToDataBase,08:12:13.9472212,JobId:,84
ProcessJob,08:12:14.3122494,JobId:,93
ProcessJob,08:12:14.7053031,JobId:,94
SendToDataBase,08:12:14.7053031,JobId:,85
SendToDataBase,08:12:14.7092946,JobId:,86
ProcessJob,08:12:14.9393634,JobId:,95
ProcessJob,08:12:15.4103709,JobId:,96
SendToDataBase,08:12:15.4113707,JobId:,87
ProcessJob,08:12:15.9355263,JobId:,97
ProcessJob,08:12:15.9724349,JobId:,98
SendToDataBase,08:12:15.9724349,JobId:,88
SendToDataBase,08:12:15.9774350,JobId:,89
ProcessJob,08:12:15.9724349,JobId:,99
SendToDataBase,08:12:15.9784371,JobId:,90
SendToDataBase,08:12:15.9834330,JobId:,91
ProcessJob,08:12:16.6175125,JobId:,100
SendToDataBase,08:12:16.6175125,JobId:,92
SendToDataBase,08:12:16.6555160,JobId:,93
SendToDataBase,08:12:17.5005984,JobId:,94
SendToDataBase,08:12:17.8846409,JobId:,95
SendToDataBase,08:12:17.8886408,JobId:,96
SendToDataBase,08:12:18.1186677,JobId:,97
SendToDataBase,08:12:18.7557365,JobId:,98
SendToDataBase,08:12:18.7567394,JobId:,99
SendToDataBase,08:12:19.5488221,JobId:,100

Edit The new subscription will either send your items to the Db or handle a faulted job in a way you choose.

Further resources:

Stack Exchange Code Review

Dataflow Source

Syrinx answered 19/1, 2017 at 15:51 Comment(8)
So i used your code and i see two problems. One being that it will pass jobs over to CreateResponse where it shouldn't because of BoundedCapacity being locked to 1 or 32 in the options. Jobs should only go there if the returnCode is not 100. If i set it to Unbounded the outputs for "ProcessJob" are not in order, but SendToDataBase is, which makes me think that it reorders the currently worked on element before returning it from the TransformBlock or waiting until the next element in the order is finished with processing, but the processing itself inside the Block is not in order?Indign
@Indign In your modified code I have redirected your final output to a stream each job is processed in order in the handler of that stream which can be seen in the subscription. Cancellation and faults were not the focus of this question, which is all your return code mapping does. I cannot tell what exactly you are setting to "Unbounded" but if you are setting an ActionBlock with greater than one MaxDegreeOfParallelism to an unbounded capacity it will process your items out of order. Please see the quick mod which handles the return code in the stream.Syrinx
Also: Outputs for ProcessJob are not equivalent to the WriteLine ProcessJob. The TransformBlock will reorder the outputs before passing them to the stream and/or next block. The WriteLine in process job is correctly out of order because the items are processed in that step out of order i.e. in parallel, they are then reordered by that TransformBlock and sent to the next stream/block.Syrinx
So i copied your code 1:1 and pasted the output into another edit. As you can see CreateResponse is being called a few times even though i have no error handling yet and just return true in every function (no errors can happen yet). Still some jobs are passed to the Error-ActionBlock's (CreateResponse). I guess this happens because the TransformBlock has reached the BoundedCapacity of 32 and denies incoming jobs after that. I need to ensure everything gets processed, that is why i put BoundedCapacity for all Blocks to Unbounded.Indign
If you also check the SendtoDataBase line for JobId 40, you will see that the next one is 41 and then 46. All in between are missing. The second issue is, that as you said items are not processed in order inside the ProcessJob-TransformBlock, but i need to make sure they are, because this is where i wanted to save into the database. I could either use the subscription to save to the database or just use the last TransformBlock with MaxDegreeOfParallelism = 1, so everything is processed sequentially?Indign
I guess this happens because the TransformBlock has reached the BoundedCapacity of 32 Correct. I could either use the subscription to save to the database or just use the last ActionBlock with MaxDegreeOfParallelism = 1, so everything is processed sequentially? Should work fine. Note as you've seen you will need a more robust way to handle faults/cancellation than the ReturnCode based predicate.Syrinx
Is there a problem with simply sticking to Unbounded? Can recommend a good source for best practice to handle faults and cancellation?Indign
Designing the system such that it forces you to use unbounded limits your ability to throttle the flow if it becomes necessary. It may be acceptable, in isolated situations though. Here's a couple decent resources to check out Stephen Cleary - Async Producer/Consumer Stephen Toub - InterviewSyrinx

© 2022 - 2024 — McMap. All rights reserved.