How to keep track of faulted items in TPL pipeline in (thread)safe way
Asked Answered
D

1

3

I am using TPL pipeline design together with Stephen Cleary's Try library In short it wraps value/exception and floats it down the pipeline. So even items that have thrown exceptions inside their processing methods, at the end when I await resultsBlock.Completion; have Status=RunToCompletion. So I need other way how to register faulted items. Here is small sample:

var downloadBlock = new TransformBlock<int, Try<int>>(construct => Try.Create(() =>
{
    //SomeProcessingMethod();
    return 1;
}));
var processBlock = new TransformBlock<Try<int>, Try<int>>(construct => construct.Map(value =>
{
    //SomeProcessingMethod();
    return 1;
}));
var resultsBlock = new ActionBlock<Try<int>>(construct =>
{
    if (construct.IsException)
    {
        var exception = construct.Exception;
        switch (exception)
        {
            case GoogleApiException gex:
                //_notificationService.NotifyUser("OMG, my dear sir, I think I messed something up:/"
                //Register that this item was faulted, so we know that we need to retry it.
                break;
            default:
                break;
        }
    }
});

One solution would be to create a List<int> FaultedItems; where I would insert all faulted items in my Exception handling block and then after await resultsBlock.Completion; I could check if the list is not empty and create new pipeline for faulted items. My question is if I use a List<int> am I at risk of running into problems with thread safety if I decide to play with MaxDegreeOfParallelism settings and I'd be better off using some ConcurrentCollection? Or maybe this approach is flawed in some other way?

Diarmit answered 2/6, 2020 at 15:1 Comment(5)
You misunderstood Stephen Cleary's solution. The data contains faults, not the blocks themselves. The pipeline still works, so in the end, the blocks should complete successfully. If you don't want to use Result<T> objects (which makes your code a lot simpler) you can redirect fault messages to BufferBlocks (not Lists), or even ActionBlocks that write to a loggerVeron
LinkTo accepts a predicate that can be used to direct messages either to the next step in the pipeline or some other block like a BufferBlock, a logging block or even a NullBlock. Be careful though, because messages that aren't matched by any predicate will stay in their block's output buffer essentially blocking the pipeline.Veron
@Panagiotis Thank you for answer! I am totally confused right now. I do understand that the data that is flowing down the pipe contains the faults and I would gladly put it to use to make my code as simple as possible. So you mean that from my resultsBlock I should redirect messages to some BufferBlock if they contain fault? I am sorry if this sounds dumb to you. This stuff is something new to me.Diarmit
Take a look at this: Retry policy within ITargetBlock<TInput>. Implementing a dataflow block with retry functionality is quite tricky, because some must-have options generate inherent difficulties (EnsureOrdered, BoundedCapacity), and is also not obvious how to enforce a specific delay between repeated attempts for the same item. It is doable though.Leaseholder
@Theodor Thank you for answering. I will examine link you have provided. Seems quite a lot to grasp.Diarmit
L
3

I converted a retry-block implementation from an answer to a similar question, to work with Stephen Cleary's Try types as input and output. The method CreateRetryTransformBlock returns a TransformBlock<Try<TInput>, Try<TOutput>>, and the method CreateRetryActionBlock returns something that is practically an ActionBlock<Try<TInput>>.

Three more options are available, the MaxAttemptsPerItem, MinimumRetryDelay and MaxRetriesTotal, on top of the standard execution options.

public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The minimum delay duration before retrying an item.</summary>
    public TimeSpan MinimumRetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static TransformBlock<Try<TInput>, Try<TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.MinimumRetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MinimumRetryDelay));

    var internalCTS = CancellationTokenSource
        .CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);

    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    var exceptionsCount = 0;
    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
    {
        semaphore = new SemaphoreSlim(Int32.MaxValue);
    }
    else
    {
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;
    }

    var block = new TransformBlock<Try<TInput>, Try<TOutput>>(async item =>
    {
        // Continue on captured context after every await
        if (item.IsException) return Try<TOutput>.FromException(item.Exception);
        var result1 = await ProcessOnceAsync(item);
        if (item.IsException || result1.IsValue) return result1;
        for (int i = 2; i <= maxAttemptsPerItem; i++)
        {
            await Task.Delay(retryDelay, internalCTS.Token);
            var result = await ProcessOnceAsync(item);
            if (result.IsValue) return result;
        }
        return result1; // Return the first-attempt exception
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value

    _ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
        TaskScheduler.Default);

    return block;

    async Task<Try<TOutput>> ProcessOnceAsync(Try<TInput> item)
    {
        await semaphore.WaitAsync(internalCTS.Token);
        try
        {
            var result = await item.Map(transform);
            if (item.IsValue && result.IsException)
            {
                ObserveNewException(result.Exception);
            }
            return result;
        }
        finally
        {
            semaphore.Release();
        }
    }

    void ObserveNewException(Exception ex)
    {
        if (maxRetriesTotal == -1) return;
        uint newCount = (uint)Interlocked.Increment(ref exceptionsCount);
        if (newCount <= (uint)maxRetriesTotal) return;
        if (newCount == (uint)maxRetriesTotal + 1)
        {
            internalCTS.Cancel(); // The block has failed
            throw new RetryLimitException($"The max retry limit " +
                $"({maxRetriesTotal}) has been reached.", ex);
        }
        throw new OperationCanceledException();
    }
}

public static ITargetBlock<Try<TInput>> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<Try<object>>();
    block.LinkTo(nullTarget);
    return block;
}

Usage example:

var downloadBlock = CreateRetryTransformBlock(async (int construct) =>
{
    int result = await DownloadAsync(construct);
    return result;
}, new RetryExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10,
    MaxAttemptsPerItem = 3,
    MaxRetriesTotal = 100,
    MinimumRetryDelay = TimeSpan.FromSeconds(10)
});

var processBlock = new TransformBlock<Try<int>, Try<int>>(
    construct => construct.Map(async value =>
{
    return await ProcessAsync(value);
}));

downloadBlock.LinkTo(processBlock,
    new DataflowLinkOptions() { PropagateCompletion = true });

To keep things simple, in case that an item has been retried the maximum number of times, the exception preserved is the first one that occurred. The subsequent exceptions are lost. In most cases the lost exceptions are going to be of the same type as the first one anyway.

Caution: The above implementation does not have an efficient input queue. If you feed this block with millions of items, the memory usage will explode.

Leaseholder answered 2/6, 2020 at 17:44 Comment(21)
Hi Theodor! Thank you for answer. I will most definitely get back to you with some questions when I dig into it!Diarmit
@Diarmit I updated my answer with a somewhat simpler implementation. The idea is to use a single SemaphoreSlim for controlling the concurrency for both the initial attempts and the retries, instead of using internally two linked blocks.Leaseholder
Hmm, is this approach in any way inferior to the previous one?Diarmit
@Diarmit I think that both approaches are roughly equivalent. Both are also a bit experimental. I am quite confident for their robustness and efficiency for normal usage though.Leaseholder
I find this code pretty hard to debug(my lack of understanding) and maybe this is false asumption, but can it be that even if I set MaxDegreeOfParallelism =1 if one item is faulted and it has entered its retry loop, other items are being processed simultaneously while the faulted one has not completed its retry cycle yet?Diarmit
@Diarmit yeap, with MaxDegreeOfParallelism = 1 only one item will be actively processed at any moment. If an item fails, it will wait for the RetryDelay period and then will ask for the semaphore again. It may have to wait for a long time before acquiring it, because of all other items that may have asked for it, either for their first attempt or for a retry. The RetryDelay is just the minimum duration between retries. The average delay may be much longer. The SemaphoreSlim class maintains internally a FIFO queue with its awaiters, so the processing of all items will happen with fairness.Leaseholder
So, just to make clear, if I push into pipeline var data = new List<string>(){a,b,c,d} and if "a" fails it could happen that while I am awaiting retryDelay SemaphoreSlim allows other entry, lets say, "b" to proceed and SemaphoreSlim puts first item "a" at the end of the line?Diarmit
@Diarmit yes, the "a" will be retried only after "b", "c" and "d" have all been attempted once. By default though, since the EnsureOrdered option is true by default, the "a" item will be the first to be exported by the block, even if all other letters have been successfully processed earlier.Leaseholder
I would like to test this behavior but setting new RetryExecutionDataflowBlockOptions(){MaxAttemptsPerItem = 3, RetryDelay=TimeSpan.FromSeconds(10), EnsureOrdered = false }); does not let other items get in front of the failed one. At the end it still comes out a,b,c,d.Diarmit
@Diarmit I can't reproduce it. I have linked the "Retry" block to an ActionBlock that writes to the Console the letter it receives, and it writes the letters in the order b, c, d, a. Are you sure that processing the "a" results to an exception?Leaseholder
I am sorry, it was my mistake. Yes, it works now. Thank you for help, I will now proceed with further testing!Diarmit
@Diarmit I noticed a possible drawback of the above implementation. It prioritizes processing new items instead of retrying failed items, resulting to all failed items retried at the end. This can be a bit inefficient in case that the failed items have higher probability to fail again, because of the delay imposed between retries. So the block may end up being less productive at its final stage (too much delay, too little work). This could be easily fixed if the .NET had some semaphore class supporting priorities, but unfortunately it hasn't.Leaseholder
Thank you for informing about this. I have spent 2 days trying to figure out how to elegantly return value tuple (TId, Try<TOutput)> instead of Try<(TId,TOutput)> Let's say I use it like this CreateRetryTransformBlock<MyClass, (string, int)>(...) it would return Try<(string, int)> but how to achieve so that only int value is wrapped inside Try? You have already dedicated tremendous amount of time on this answer, so I wanted to figure it out myself, but once again, I have understood that my knowledge on this subject is limited.Diarmit
@Diarmit I updated my answer with some minor improvements, that don't change drastically the behavior of the method. Small API change, the RetryDelay option is renamed to the more appropriate MinimumRetryDelay. For a version that propagates also the id CreateRetryTransformBlock<TId, TInput, TOutput>(... check this fiddle.Leaseholder
@Diarmit btw a year ago I would do have a hard time wrapping my head around all this async-await stuff. It's anything but trivial. Be assured that you'll get better by practice. 😃Leaseholder
Yes, it now works! Thank you! I am exploring your answer further, and I think I will "improve" it a bit. For example, there might be Exception's that are not temporary. Lets say, user has forgotten to plug internet cable and if pipeline contains 100 items, MinimumRetryDelay =5 sek and each faulted item has to wait for SemaphoreSlip at the end of the line, by the time it will reach its MaxAttemptsPerItem, considerable amount of time might have passed just to realize that (for example) cable is not plugged in.Diarmit
I could of course reduce MaxRetriesTotal but that could backfire in case of real temporary Exception. So I think I could implement some custom filter that will track count of specific exceptions and fault entire TransformBlock if it has reached specified threshold before block itself lets escape Exception when it has finally reached its MaxAttemptsPerItem and still failed.Diarmit
Hi @niks! Yeah, there is certainly ample space for improvements. For example you could add a new option public Type[] FatalExceptionTypes { get; set; }; in the RetryExecutionDataflowBlockOptions class, and check for these types inside the ObserveNewException method. Or for more flexibility you could add an event-type property, like public Action<TInput, Exception> OnException { get; set; }; that would be invoked inside the ObserveNewException.Leaseholder
Thank you for suggestions, Theodor! I have one last question - before edit you used original CancellationToken and then you switched to linked token. Is there any particular reason for that?Diarmit
@Diarmit yes, it is a small improvement. In case of a fatal exception (currently only the RetryLimitException) the internalCTS is canceled, causing all awaiting for the semaphore and the retry delays to be cancelled immediately. This increases the responsiveness of the block in case of a fatal exception. Btw feel free to ask as many questions as needed. Answering them is my pleasure. :-)Leaseholder
Ah, yes. I was thinking that you could use original CancellationToken for this but you can't call Cancel on it. You need CancellationTokenSource for that. What a silly oversight from me. I think I am done with this question for now. Thank you for your tremendous effort! Answering in such a detail and following through with all the questions in such a fast manner even after working hours!:) It feels like premium support package! Efcharistó!:)Diarmit

© 2022 - 2024 — McMap. All rights reserved.