How can I make sure a dataflow block only creates threads on a on-demand basis?
Asked Answered
F

1

21

I've written a small pipeline using the TPL Dataflow API which receives data from multiple threads and performs handling on them.

Setup 1

When I configure it to use MaxDegreeOfParallelism = Environment.ProcessorCount (comes to 8 in my case) for each block, I notice it fills up buffers in multiple threads and processing the second block doesn't start until +- 1700 elements have been received across all threads. You can see this in action here.

Setup 2

When I set MaxDegreeOfParallelism = 1 then I notice all elements are received on a single thread and processing the sending already starts after +- 40 elements are received. Data here.

Setup 3

When I set MaxDegreeOfParallelism = 1 and I introduce a delay of 1000ms before sending each input, I notice elements get sent as soon as they are received and every received element is put on a separate thread. Data here.


So far the setup. My questions are the following:

  1. When I compare setups 1 & 2 I notice that processing elements starts much faster when done in serial compared to parallel (even after accounting for the fact that parallel has 8x as many threads). What causes this difference?

  2. Since this will be run in an ASP.NET environment, I don't want to spawn unnecessary threads since they all come from a single threadpool. As shown in setup 3 it will still spread itself over multiple threads even when there is only a handful of data. This is also surprising because from setup 1 I would assume that data is spread sequentially over threads (notice how the first 50 elements all go to thread 16). Can I make sure it only creates new threads on a on-demand basis?

  3. There is another concept called the BufferBlock<T>. If the TransformBlock<T> already queues input, what would be the practical difference of swapping the first step in my pipeline (ReceiveElement) for a BufferBlock?


class Program
{
    static void Main(string[] args)
    {
        var dataflowProcessor = new DataflowProcessor<string>();
        var amountOfTasks = 5;
        var tasks = new Task[amountOfTasks];

        for (var i = 0; i < amountOfTasks; i++)
        {
            tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}");
        }

        foreach (var task in tasks)
        {
            task.Start();
        }

        Task.WaitAll(tasks);
        Console.WriteLine("Finished feeding threads"); // Needs to use async main
        Console.Read();
    }

    private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName)
    {
        return new Task(async () =>
        {
            await FeedData(dataflowProcessor, taskName);
        });
    }

    private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName)
    {
        foreach (var i in Enumerable.Range(0, short.MaxValue))
        {
            await Task.Delay(1000); // Only used for the delayedSerialProcessing test
            dataflowProcessor.Process($"Thread name: {threadName}\t Thread ID:{Thread.CurrentThread.ManagedThreadId}\t Value:{i}");
        }
    }
}


public class DataflowProcessor<T>
{
    private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };

    private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element =>
    {
        Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}");
        return element;
    }, ExecutionOptions);

    private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element =>
    {
        Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(element);
    }, ExecutionOptions);

    static DataflowProcessor()
    {
        ReceiveElement.LinkTo(SendElement);

        ReceiveElement.Completion.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                ((IDataflowBlock) ReceiveElement).Fault(x.Exception);
            }
            else
            {
                ReceiveElement.Complete();
            }
        });
    }


    public void Process(T newElement)
    {      
        ReceiveElement.Post(newElement);
    }
}
Fallal answered 15/6, 2016 at 11:8 Comment(9)
TPL Dataflow has no knowledge of threads, it uses tasks. The ThreadPool matches tasks with threads. You can change that by setting your own TaskScheduler.Atrioventricular
@Atrioventricular The real reason I'm writing this is to offload work that is done on the request thread to other threads so the request thread is freed up and can move on. Perhaps I'm approaching it the wrong way?Fallal
@JeroenVannevel as l3arnon already mentioned, TPL Dataflow uses tasks so it does free the posting thread. Your code is overcomplicated. A simple Post to the start block and awaiting the Completion property of the last block should be enoughAstromancy
@PanagiotisKanavos I don't want to create an abundance of tasks who all requests threads from the IIS threadpool. If I understood things correctly, by passing it through dataflow I can essentially limit the amount of tasks (or threads?) created (MaxDegreeParallelism) while automatically queuing other incoming data. Have I understood this incorrectly?Fallal
Probably, in several ways. First, tasks are the abstraction, not the thread itself. The TaskScheduler is the thing that assigns tasks to the already created threads in the thread pool. In any case, MaxDOP=1 by default with each block using its own task. You have to intervene to change the DOP to a greater number. And even a MaxDOP=10 doesn't mean you get 10 threads. If processing is fast enough, you may only use 1 or 5 or whatever is needed to process the input bufferAstromancy
Finally, bounding the buffers allows you to throttle the entire pipeline by forcing upstream blocks to pause if a downstream block's input buffer is full.Astromancy
Eg, I need to call to a GDS to request a couple thousand air ticket records. I use a large DOP for the request step because it's just slow HTTP requests. In fact, the limit depends on how many requests/sec can be processed by the GDS. The record parsing step though has stricter limits because I don't want to download tickets faster than I can parse themAstromancy
@PanagiotisKanavos That sounds exactly what I'm after though. Right now every request that comes in, stores some data at several points throughout the request lifetime and at the very end of the request it sends all that data off to the database. I want to do this on another thread (or at least: free up the request thread) so I would use tasks. However I don't want to create too many tasks because they all use the ThreadPool and thread exhaustion is a real worry when you have thousands of requests each with 30+ db calls each. Hence I want to throttle it and queue other db calls.Fallal
It seems that you don't start one pipeline, but five, and all of them are configured with parallelism equal to the number of processors. This doesn't make much sense. By oversubscribing to the system resources you are reducing the efficiency of the whole system. In ASP.NET you rarely need to use more that one processors for a single request, and if you do it would be for some special request of a privileged user, not for everyone.Convince
A
12

Before you deploy your solution to the ASP.NET environment, I suggest you to change your architecture: IIS can suspend threads in ASP.NET for it's own use after the request handled so your task could be unfinished. Better approach is to create a separate windows service daemon, which handles your dataflow.

Now back to the TPL Dataflow.

I love the TPL Dataflow library but it's documentation is a real mess.
The only useful document I've found is Introduction to TPL Dataflow.

There are some clues in it which can be helpful, especially the ones about Configuration Settings (I suggest you to investigate the implementing your own TaskScheduler with using your own TheadPool implementation, and MaxMessagesPerTask option) if you need:

The built-in dataflow blocks are configurable, with a wealth of control provided over how and where blocks perform their work. Here are some key knobs available to the developer, all of which are exposed through the DataflowBlockOptions class and its derived types (ExecutionDataflowBlockOptions and GroupingDataflowBlockOptions), instances of which may be provided to blocks at construction time.

  • TaskScheduler customization, as @i3arnon mentioned:

    By default, dataflow blocks schedule work to TaskScheduler.Default, which targets the internal workings of the .NET ThreadPool.

  • MaxDegreeOfParallelism

    It defaults to 1, meaning only one thing may happen in a block at a time. If set to a value higher than 1, that number of messages may be processed concurrently by the block. If set to DataflowBlockOptions.Unbounded (-1), any number of messages may be processed concurrently, with the maximum automatically managed by the underlying scheduler targeted by the dataflow block. Note that MaxDegreeOfParallelism is a maximum, not a requirement.

  • MaxMessagesPerTask

    TPL Dataflow is focused on both efficiency and control. Where there are necessary trade-offs between the two, the system strives to provide a quality default but also enable the developer to customize behavior according to a particular situation. One such example is the trade-off between performance and fairness. By default, dataflow blocks try to minimize the number of task objects that are necessary to process all of their data. This provides for very efficient execution; as long as a block has data available to be processed, that block’s tasks will remain to process the available data, only retiring when no more data is available (until data is available again, at which point more tasks will be spun up). However, this can lead to problems of fairness. If the system is currently saturated processing data from a given set of blocks, and then data arrives at other blocks, those latter blocks will either need to wait for the first blocks to finish processing before they’re able to begin, or alternatively risk oversubscribing the system. This may or may not be the correct behavior for a given situation. To address this, the MaxMessagesPerTask option exists. It defaults to DataflowBlockOptions.Unbounded (-1), meaning that there is no maximum. However, if set to a positive number, that number will represent the maximum number of messages a given block may use a single task to process. Once that limit is reached, the block must retire the task and replace it with a replica to continue processing. These replicas are treated fairly with regards to all other tasks scheduled to the scheduler, allowing blocks to achieve a modicum of fairness between them. In the extreme, if MaxMessagesPerTask is set to 1, a single task will be used per message, achieving ultimate fairness at the potential expense of more tasks than may otherwise have been necessary.

  • MaxNumberOfGroups

    The grouping blocks are capable of tracking how many groups they’ve produced, and automatically complete themselves (declining further offered messages) after that number of groups has been generated. By default, the number of groups is DataflowBlockOptions.Unbounded (-1), but it may be explicitly set to a value greater than one.

  • CancellationToken

    This token is monitored during the dataflow block’s lifetime. If a cancellation request arrives prior to the block’s completion, the block will cease operation as politely and quickly as possible.

  • Greedy

    By default, target blocks are greedy and want all data offered to them.

  • BoundedCapacity

    This is the limit on the number of items the block may be storing and have in flight at any one time.

Amblygonite answered 15/6, 2016 at 13:7 Comment(4)
The problem isn't the documentation, because Dataflow is really simple. The problem is actually accepting that it is that simple and doesn't require any special tricks or settings. Stephen Cleary has written a series of introductory blog posts as wellAstromancy
MSN also has an entire section on Dataflow with how-to guides and walkthroughsAstromancy
@PanagiotisKanavos Disagree: how-to guides aren't saying about the settings for the blocks much, document I've linked does. Stephen Cleary wrote a great introduction posts, rather than OP needs a beyond basics customization.Amblygonite
@PanagiotisKanavos More about the question: I think that the most important thing that OP tried to use TPL inside ASP.NET, which can be error-prone as threads can be suspended via IIS.Amblygonite

© 2022 - 2024 — McMap. All rights reserved.