Dataflow with splitting work to small jobs and then group again
Asked Answered
D

3

14

I need to do this kind of work:

  1. Get Page object from database
  2. For each page get all images and process them (IO bound, for example, upload to CDN)
  3. If all images proceeded successfully then mark Page as processed in database

Since I need to control how much Pages I process in parallel I've decided to go with TPL Dataflows:

 ____________________________
|         Data pipe          |
|   BufferBlock<Page>        |
|   BoundedCapacity = 1      |
|____________________________|
              |
 ____________________________
|       Process images       |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 8 |
|____________________________|
              |
 ____________________________
|        Save page           |
| ActionBlock<Page>          |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 5 |
|____________________________|

Now I need the "Process images" to process images in parallel but I want to limit how much images I've processing across all parallel pages in work currently.

I can use TrasnformManyBlock for "Process images" but how do I gather them back in "Save page" block?

         ____________________________
        |         Data pipe          |
        |   BufferBlock<Page>        |
        |   BoundedCapacity = 1      |
        |____________________________|
                      |
     ___________________________________
    |           Load images             |
    | TransformManyBlock<Page, Image[]> |
    | BoundedCapacity = 1               |
    | MaxDegreeOfParallelism = 8        |
    |___________________________________|
      /              |              \
   ______________________________________________
 _|____________________________________________  |
|              Process image                   | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1                          | |
| MaxDegreeOfParallelism = 8                   |_|
|______________________________________________|
      \              |               /
         How to group images by page ?
                     |
        ____________________________
       |        Save page           |
       | ActionBlock<Page>          |
       | BoundedCapacity = 1        |
       | MaxDegreeOfParallelism = 5 |
       |____________________________|

On top of that potentially one of the images could fail to be proceed and I don't want to save page with failed images.

Dune answered 25/10, 2014 at 7:53 Comment(3)
What's the problem exactly? According to the diagram you already figured it out. Set MaxDegreeOfParallelism in the middle step to the desired level.Evidentiary
I've edited the question with the diagram of using TransformManyBlock.Dune
As a side note, configuring a block with BoundedCapacity smaller than the MaxDegreeOfParallelism will reduce the degree of parallelism to the value of the capacity. In other words, the block cannot process 8 images simultaneously if it is allowed to buffer only one.Dupre
B
7

You can group the images together by recording whenever an image for a given page arrives and then sending the page on when all images arrived. To figure that out, page needs to know how many images it contains, but I assume you know that.

In code, it could look something like this:

public static IPropagatorBlock<TSplit, TMerged>
    CreaterMergerBlock<TSplit, TMerged>(
    Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
    var dictionary = new Dictionary<TMerged, int>();

    return new TransformManyBlock<TSplit, TMerged>(
        split =>
        {
            var merged = getMergedFunc(split);
            int count;
            dictionary.TryGetValue(merged, out count);
            count++;
            if (getSplitCount(merged) == count)
            {
                dictionary.Remove(merged);
                return new[] { merged };
            }

            dictionary[merged] = count;
            return new TMerged[0];
        });
}

Usage:

var dataPipe = new BufferBlock<Page>();

var splitter = new TransformManyBlock<Page, ImageWithPage>(
    page => page.LoadImages(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
    image =>
    {
        // process the image here
        return image;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var merger = CreaterMergerBlock(
    (ImageWithPage image) => image.Page, page => page.ImageCount);

var savePage = new ActionBlock<Page>(
    page => /* save the page here */,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
Baseless answered 27/10, 2014 at 2:2 Comment(5)
Thanks. That's quite similar to what I've been thinking already and uses Dataflow. But shouldn't you use ConcurrentDictionary?Dune
@MichaelLogutov ConcurrentDictionary is not necessary here, since the TransformManyBlock has MaxDegreeOfParallelism set to 1. That means the Dictionary will never be accessed from more than one thread at any given time.Baseless
Right. Forgot about default options set DOP to 1. Thanks.Dune
@Baseless I know it has been a long time, but could you please provide some explanation to how the merging works? Also see my question.Karee
@Baseless love that! Thank you!Amphiaster
D
0

The .NET platform has a nice interface that can represent parent-child relationships, the IGrouping<TKey, TElement> interface. It is simply an IEnumerable that also has a Key property. The key can be anything, and in this case in could be the Page that needs to be processed. The contents of the grouping could be the Images that belong to each page, and need to be uploaded. This leads to the idea of a dataflow block that can process IGrouping<TKey, TInput> objects, by processing independently each TInput, then aggegate the results per grouping, and finally output them as IGrouping<TKey, TOutput> objects. Below is an implementation of this idea:

public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions ??= new ExecutionDataflowBlockOptions();

    var actionBlock = new ActionBlock<Task<Task<TOutput>>>(taskTask =>
    {
        // An exception thrown by the following line would cause buggy behavior.
        // According to the documentation it should never fail.
        taskTask.RunSynchronously();
        return taskTask.Unwrap();
    }, dataflowBlockOptions);

    var completionCTS = new CancellationTokenSource();
    _ = actionBlock.Completion
        .ContinueWith(_ => completionCTS.Cancel(), TaskScheduler.Default);

    var transformBlock = new TransformBlock<IGrouping<TKey, TInput>,
        IGrouping<TKey, TOutput>>(async grouping =>
    {
        if (grouping == null) throw new InvalidOperationException("Null grouping.");
        var tasks = new List<Task<TOutput>>();
        foreach (var item in grouping)
        {
            // Create a cold task that will be either executed by the actionBlock,
            // or will be canceled by the completionCTS. This should eliminate
            // any possibility that an awaited task will remain cold forever.
            var taskTask = new Task<Task<TOutput>>(() => transform(grouping.Key, item),
                completionCTS.Token);
            var accepted = await actionBlock.SendAsync(taskTask);
            if (!accepted)
            {
                // The actionBlock has failed.
                // Skip the rest of the items. Pending tasks should still be awaited.
                tasks.Add(Task.FromCanceled<TOutput>(new CancellationToken(true)));
                break;
            }
            tasks.Add(taskTask.Unwrap());
        }
        TOutput[] results = await Task.WhenAll(tasks);
        return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping
    }, dataflowBlockOptions);

    // Cleanup
    _ = transformBlock.Completion
        .ContinueWith(_ => actionBlock.Complete(), TaskScheduler.Default);
    _ = Task.WhenAll(actionBlock.Completion, transformBlock.Completion)
        .ContinueWith(_ => completionCTS.Dispose(), TaskScheduler.Default);

    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<IGrouping<TKey, TInput>, IGrouping<TKey, TOutput>>
    CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        Func<TKey, TInput, TOutput> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    return CreateTransformGroupingBlock<TKey, TInput, TOutput>(
        (key, item) => Task.FromResult(transform(key, item)), dataflowBlockOptions);
}

This implementation consists of two blocks, a TransformBlock that processes the groupings and an internal ActionBlock that processes the individual items. Both are configured with the same user-supplied options. The TransformBlock sends to the ActionBlock the items to be processed one by one, then waits for the results, and finally constructs the output IGrouping<TKey, TOutput> with the following tricky line:

return results.GroupBy(_ => grouping.Key).Single(); // Convert to IGrouping

This compensates for the fact that currently there is no publicly available class that implements the IGrouping interface, in the .NET platform. The GroupBy+Single combo does the trick, but it has the limitation that it doesn't allow the creation of empty IGroupings. In case this is an issue, creating a class that implements this interface is always an option. Implementing one is quite straightforward (here is an example).

Usage example of the CreateTransformGroupingBlock method:

var processPages = new TransformBlock<Page, IGrouping<Page, Image>>(page =>
{
    Image[] images = GetImagesFromDB(page);
    return images.GroupBy(_ => page).Single(); // Convert to IGrouping
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var uploadImages = CreateTransformGroupingBlock<Page, Image, Image>(async (page, image) =>
{
    await UploadImage(image);
    return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var savePages = new ActionBlock<IGrouping<Page, Image>>(grouping =>
{
    var page = grouping.Key;
    foreach (var image in grouping) SaveImageToDB(image, page);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

processPages.LinkTo(uploadImages);
uploadImages.LinkTo(savePages);

The type of the uploadImages variable is TransformBlock<IGrouping<Page, Image>, IGrouping<Page, Image>>. In this example the types TInput and TOutput are the same, because the images need not to be transformed.

Dupre answered 1/12, 2020 at 5:56 Comment(0)
E
-1

Consider merging "Load images" and "Process images" into one TransformBlock block. That way you have no trouble keeping the images of a single page together.

In order to achieve your concurrency limit goal, use a SemaphoreSlim:

SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8);

//...

var page = ...; //TransformBlock<Page, MyPageAndImageDTO> block input
var images = GetImages(page);
ImageWithPage[] processedImages =
 images
 .AsParallel()
 .Select(i => {
    processImageDopLimiter.WaitOne();
    var result = ProcessImage(i);
    processImageDopLimiter.ReleaseOne();
    return result;
 })
 .ToList();
return new { page, processedImages };

This will lead to quite a few threads blocked waiting. You can use an asynchronous version of this processing if you like. This is immaterial to the question.

Evidentiary answered 25/10, 2014 at 11:2 Comment(15)
I was hoped for more Dataflow-way of doing things because I think that artificially limiting DOP by blocking task is like cheating Dataflow which have the main goal to orchestrate over work load. Could this lead to Dataflow not correctly tuning itself to work load?Dune
Good questions. Dataflow does not tune anything. It maxes out the DOP you specify. Just like our custom solution does. Note, that you can say await semaphoreSlim.WaitOneAsync() and you'll block zero threads while waiting. Internally, the semaphore has a queue and waiters are put in it.; I can see no fundamental difference between limiting the DOP "manually" or by using Dataflow. Do you see any concrete disadvantage?; Actually, you don't need Dataflow here at all because semaphores can do all of this. But it kind of makes sense to use Dataflow.Evidentiary
Correct me if I'm wrong, but I think Dataflow already has the same kind of mechanism to maxing out DOP with blocks. So implementing another limiter meaning that we fighting the Dataflow framework. And as you've mentioned - I can't use this solution with workflow because even with SemaphoreSlim I still can't use TransformManyBlock because I still haven't figured out how to group images back by their page.Dune
Right. Edited the code so that the page and the images stay grouped. What exactly is wrong with the parallelism behavior here? We are getting 8 concurrent threads for GetImages (because the transform block does that limiting). We are getting 8 concurrent threads for ProcessImage. That's exactly like your example is configured.Evidentiary
So, why TransformManyBlock then? It get Page as input and processing many { page, image[] } results from each page?Dune
I recommend a TransformBlock. My previous edit was incomplete. Fixed.; We are making progress only slowly. Are there any concerns remaining that prevent you from adopting this solution?Evidentiary
Well, I guess it's the only solution currently. Another one is with custom scheduler which (I think) will be similar in it's core.Dune
Yes, exactly. I wouldn't go that route because it requires you to manage your own threads. Remember, that with the async semaphore you don't block threads eitherEvidentiary
Tbh I'm not very found of your solution - lets take 10 parallel pages each has 10 images and 10 DOP for images processing. With your solution we've end with 10+100 threads in thread pool only 20 of them actually working. We should not spend thread if we know we can't do work in it yet. Thats why I think scheduler solution is better. But I'm yet to see good implementation of DOP-limited scheduler (the LimitedConcurrencyLevelTaskScheduler from MS TPL extras using locks and other nasty stuff in it).Dune
Then use async semaphore waiting. You don't block threads with that while waiting. I have proposed that three times now.Evidentiary
I've never talked about Thread blocking. We're not working with Thread class directly - why even you're talking about it? Of course I know that we ain't blocking thread - it's a given when working with Tasks unless you go back to Thead API. I'm talking about wasting slots in ThreadPool via tasks. You've creating more Task classes than needed. That's the problem with your solution.Dune
When you await a semaphore you don't block a "threadpool slot". Maybe you should find out what await does. It is not a keyword to simply call Wait. My solution, if converted to async (which the sample code is not), uses just as many threads/tasks as dataflow.Evidentiary
Correct me if I'm wrong but I think AsParallel using Partitioner to create Task objects for partition. And since we're not limiting it with anything (like DOP) it will create as much Task objects as elements in source data stream.Dune
That's why I say: convert this idea to an async approach. Such as manually creating one task per image found on the page (potentially 100s). Most tasks will immediately enqueue themselves into the semaphore and return. It's the same idea that the sync code (as posted here) uses, just made async.Evidentiary
The code posted here is not meant for production. This is an illustration of an idea.Evidentiary

© 2022 - 2024 — McMap. All rights reserved.