TPL Dataflow pipeline design basics
Asked Answered
S

2

10

I try to create well-designed TPL dataflow pipeline with optimal using of system resources. My project is a HTML parser that adds parsed values into SQL Server DB. I already have all methods of my future pipeline, and now my question is what is the optimal way to place them in Dataflow blocks, and how much blocks i should use? Some of methods are CPU-bound, and some of them - I/O-bound(loading from Internet, SQL Server DB queries). For now I think that placing each I/O operation in separate block is the right way like on this scheme: TPL Dataflow pipeline

What are the basic rules of designing pipelines in that case?

Sialoid answered 10/3, 2014 at 10:15 Comment(0)
F
7

One way to choose how to divide the blocks is to decide which parts you want to scale independently of the others. A good starting point is to divide the CPU-bound portions from the I/O-bound portions. I'd consider combining the last two blocks, since they are both I/O-bound (presumably to the same database).

Felon answered 11/3, 2014 at 12:46 Comment(0)
C
1

I post a sample general pipeline from Concurrent Programming on Windows . The good pipeline is a balanced pipeline it means that each stage could not bottle neck within pipeline. Base on the sample code, you could create as many threads to execute each stage.

Source code:

public class Pipeline<TSource, TDest> : IPipeline
{
  private readonly IPipelineStage[] _stages;

  public Pipeline(Func<TSource, TDest> transform, int degree) : 
     this (new IPipelineStage[0], transform, degree) {}

  internal Pipeline(IPipelineStage[] toCopy, Func<TSource, TDest> transform, int degree) 
  {
     _stages = new IPipelineStage[toCopy.Length] + 1;
     Array.Copy(toCopy, _stages, _stages.Length);
     _stages[_stages.Length - 1] = new PipelineStage(transform, degree);
  }

  public Pipeline<TSource, TNew> AddStage<TNew>(Func<TDest, TNew> transform, degree) 
  {
     return new Pipeline<TSource, TNew>(_stages, transform, degree);
  }

  public IEnumerator<TDest> GetEnumerator(IEnumerable<TSrouce> arg)
  {
     IEnumerable er = arg;
     CountdownEvent ev = null;

     for (int i = 0; i < _stages.Length; i++)
       er = _stages[i].Start(er, ref ev);

     foreach (TDest elem in ef)
       yield return elem;
  }
}

class PipelineStage<TInput, TOutput> : IPipelineStage
{
   private readonly Func<TInput, TOutput> _transform;
   private readonly int _degree;

   internal PipelineStage(Func<TInput, TOutput> transform, int degree)
   {
      _transform = transform;
      _degree = degree;
   }

   internal IEnumerable Start(IEnumerable src)
   {
       //...
   }
}

interface IPipelineStage 
{
   IEnumerable Start(IEnumerable Src);
}
Candytuft answered 10/3, 2014 at 14:12 Comment(1)
The question is specifically about TPL Dataflow, which already contains code similar to what you posted. So I don't see how does the code answer the question.Sunroom

© 2022 - 2024 — McMap. All rights reserved.