I post a sample general pipeline from Concurrent Programming on Windows . The good pipeline is a balanced pipeline it means that each stage could not bottle neck within pipeline. Base on the sample code, you could create as many threads to execute each stage.
Source code:
public class Pipeline<TSource, TDest> : IPipeline
{
private readonly IPipelineStage[] _stages;
public Pipeline(Func<TSource, TDest> transform, int degree) :
this (new IPipelineStage[0], transform, degree) {}
internal Pipeline(IPipelineStage[] toCopy, Func<TSource, TDest> transform, int degree)
{
_stages = new IPipelineStage[toCopy.Length] + 1;
Array.Copy(toCopy, _stages, _stages.Length);
_stages[_stages.Length - 1] = new PipelineStage(transform, degree);
}
public Pipeline<TSource, TNew> AddStage<TNew>(Func<TDest, TNew> transform, degree)
{
return new Pipeline<TSource, TNew>(_stages, transform, degree);
}
public IEnumerator<TDest> GetEnumerator(IEnumerable<TSrouce> arg)
{
IEnumerable er = arg;
CountdownEvent ev = null;
for (int i = 0; i < _stages.Length; i++)
er = _stages[i].Start(er, ref ev);
foreach (TDest elem in ef)
yield return elem;
}
}
class PipelineStage<TInput, TOutput> : IPipelineStage
{
private readonly Func<TInput, TOutput> _transform;
private readonly int _degree;
internal PipelineStage(Func<TInput, TOutput> transform, int degree)
{
_transform = transform;
_degree = degree;
}
internal IEnumerable Start(IEnumerable src)
{
//...
}
}
interface IPipelineStage
{
IEnumerable Start(IEnumerable Src);
}