I'm working with TPL Dataflow now and I need to implement my own action block.
This action block should accept messages from two different input blocks, put these messages into single queue and then process this queue sequentially. The main point here is that two different tasks shouldn't be executed concurrently and I don't want use locks.
Here is my solution but it doesn't work properly.
public class OrderedActionBlock<TInputLhs, TInputRhs> : IDataflowBlock
where TInputLhs : class
where TInputRhs : class
{
public ITargetBlock<TInputLhs> InputLhs { get { return inputLhs; } }
public ITargetBlock<TInputRhs> InputRhs { get { return inputRhs; } }
private readonly BufferBlock<TInputLhs> inputLhs = new BufferBlock<TInputLhs>();
private readonly BufferBlock<TInputRhs> inputRhs = new BufferBlock<TInputRhs>();
private ITargetBlock<object> queue;
public OrderedActionBlock(Action<TInputLhs> actionLhs, Action<TInputRhs> actionRhs)
{
queue = new ActionBlock<object>(x =>
{
if (x is TInputLhs)
{
actionLhs(x as TInputLhs);
}
else
{
actionRhs(x as TInputRhs);
}
});
inputLhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
inputRhs.LinkTo(queue, new DataflowLinkOptions() { PropagateCompletion = true });
}
public void Complete()
{
queue.Complete();
}
public Task Completion
{
get { return queue.Completion; }
}
public void Fault(Exception exception)
{
queue.Fault(exception);
}
}
Simple usage example:
static void Main(string[] args)
{
var splitBlock = new SplitBlock<string>(new Predicate<string>(s => s.Length % 2 == 0));
var batchBlock = new BatchBlock<string>(3);
var processInOrderBlock = new OrderedActionBlock<string, string[]>(
new Action<string>((str) =>
{
Console.WriteLine(str);
}),
new Action<string[]>((batch) =>
{
Console.WriteLine("BATCH - " + string.Join(", ", batch));
}));
splitBlock.SourceFiltered.LinkTo(processInOrderBlock.InputLhs, new DataflowLinkOptions() { PropagateCompletion = true });
splitBlock.SourceNonFiltered.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
batchBlock.LinkTo(processInOrderBlock.InputRhs, new DataflowLinkOptions() { PropagateCompletion = true });
for (int i = 1; i <= 10; i++)
{
splitBlock.Post(new string(Enumerable.Repeat('x', i).ToArray()));
}
splitBlock.Complete();
processInOrderBlock.Completion.Wait();
return;
}
The output:
xx
xxxx
xxxxxx
xxxxxxxx
xxxxxxxxxx
BATCH - x, xxx, xxxxx
Press any key to continue . . .
Looks like messages stuck in batchBlock
. And I don't know why.
Message
from one andMessage[]
from another) and perform different operations on them. I need custom block here – Loavesqueue = new ActionBlock...
line you are doing just that: Creating an ActionBlock that accepts two different sources of different types. Now configure maxparallelism = 1 for that block. – Splint