I am attempting to use TPL Dataflow to create a pipeline. All is working fine so far, with my pipeline defined as follows (although my issue is just with broadcaster, submissionSucceeded, submissionFailed):
// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));
// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);
The issue I have is with the propagation of Exceptions. Because my BroadcastBlock propagates its completion (and therefore any Fault) to two blocks, if an exception does occur, it gets propagated to both blocks. Thus when I do
Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
I end up with an aggregate exception containing two exceptions. Right now the best I can do is to filter these, i.e.:
try
{
Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}
but I'm wondering if there's a better way to do this. i.e. if only one exception occurs, I only want one exception raised. I'm new to Dataflow, so just discovering all the conventions.
submissionSucceeded
andsubmissionFailed
be actually separate blocks? If you combined them into one that checksstate.PostSucceeded
inside, that would solve the issue. – Gammadion