Duplicate exceptions with BroadcastBlock in TPL Dataflow
Asked Answered
D

1

32

I am attempting to use TPL Dataflow to create a pipeline. All is working fine so far, with my pipeline defined as follows (although my issue is just with broadcaster, submissionSucceeded, submissionFailed):

// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));

// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);

The issue I have is with the propagation of Exceptions. Because my BroadcastBlock propagates its completion (and therefore any Fault) to two blocks, if an exception does occur, it gets propagated to both blocks. Thus when I do

Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);

I end up with an aggregate exception containing two exceptions. Right now the best I can do is to filter these, i.e.:

try
{
    Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
    var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
    Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}

but I'm wondering if there's a better way to do this. i.e. if only one exception occurs, I only want one exception raised. I'm new to Dataflow, so just discovering all the conventions.

Durst answered 3/2, 2014 at 17:47 Comment(3)
Should submissionSucceeded and submissionFailed be actually separate blocks? If you combined them into one that checks state.PostSucceeded inside, that would solve the issue.Gammadion
Yes, very possibly, in this situation. I might actually do that. But, more generally, this would happen any time a flow splits. For example, in the link below - if an exception is thrown in broadcaster, you'll end up with it duplicated in storage processor: taskmatics.com/blog/…Durst
There is a good article about exceptions from MSDN.Sort
T
1

I've written a TPL DataFlow example (https://github.com/squideyes/PodFetch) that takes a slightly different approach to completion and error handling. Here's the relevant code from Line's 171 to 201 of Program.cs:

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

As you can see, I link a couple of TPL blocks together then get primed for handling completion using a HandleCompletion extension method:

    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

Very importantly, I call scraper.Complete() when I'm done passing in objects to the first block in the chain. With that, the HandleCompletion extension method then deals with continuation. And, since I'm waiting on fetcher (the last block in the chain to complete), it's easy to catch any resulting errors within a try/catch.

Toed answered 9/10, 2014 at 19:27 Comment(1)
I believe that your HandleCompletion method does the same thing as PropagateCompletion does under the hood. Is there a particular reason you chose to do it this way? I don't think it solves my problem, although interestingly, it does highlight it, since it's clear to see how the exception can be duplicated by calling with multiple targets.Durst

© 2022 - 2024 — McMap. All rights reserved.