Retry policy within ITargetBlock<TInput>
Asked Answered
S

3

12

I need to introduce a retry policy to the workflow. Let's say there are 3 blocks that are connected in such a way:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

So there is a buffer which accumulates data, then send it to the transform block that processes not more that 3 items at one time, and then the result send to the action block.

Potentially during processing the transform block transient errors are possible, and I want retry the block if the error is transient for several times.

I know that blocks generally are not retryable (delegates that passed into the blocks could be made retryable). And one of the options is to wrap the delegate passed to support retrying.

I also know that there is a very good library TransientFaultHandling.Core that provides the retry mechanisms to transient faults. This is an excellent library but not in my case. If I wrap the delegate that is passed to the transform block into the RetryPolicy.ExecuteAsync method, the message inside the transform block will be locked, and until retry either completes or fails, the transform block won't be able to receive a new message. Imagine, if all the 3 messages are entered into the retrying (let's say, the next retry attempt will be in 2 minutes) and fail, the transform block will be stuck until at least one message leave the transform block.

The only solution I see is to extend the TranformBlock (actually, ITargetBlock will be enough too), and do the retry manually (like from here):

do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

i.g. to put the message inside the transform block again with a delay, but in this case the retry context (number of retries left, etc.) also should be passed into this block. Sounds too complex...

Does anyone see a simpler approach to implement retry policy for a workflow block?

Subtractive answered 4/7, 2013 at 11:57 Comment(1)
About that "(like from here):", which I have just greatly improved, you might want to look at.Fourteenth
D
16

I think you pretty much have to do that, you have to track the remaining number of retries for a message and you have to schedule the retried attempt somehow.

But you could make this better by encapsulating it in a separate method. Something like:

// it's a private class, so public fields are okay
private class RetryingMessage<T>
{
    public T Data;
    public int RetriesRemaining;
    public readonly List<Exception> Exceptions = new List<Exception>();
}

public static IPropagatorBlock<TInput, TOutput>
    CreateRetryingBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform, int numberOfRetries,
    TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler)
{
    var source = new TransformBlock<TInput, RetryingMessage<TInput>>(
        input => new RetryingMessage<TInput>
        { Data = input, RetriesRemaining = numberOfRetries });

    // TransformManyBlock, so that we can propagate zero results on failure
    TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null;
    target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>(
        async message =>
        {
            try
            {
                return new[] { await transform(message.Data) };
            }
            catch (Exception ex)
            {
                message.Exceptions.Add(ex);
                if (message.RetriesRemaining == 0)
                {
                    failureHandler(message.Exceptions);
                }
                else
                {
                    message.RetriesRemaining--;
                    Task.Delay(retryDelay)
                        .ContinueWith(_ => target.Post(message));
                }
                return null;
            }
        });

    source.LinkTo(
        target, new DataflowLinkOptions { PropagateCompletion = true });

    return DataflowBlock.Encapsulate(source, target);
}

I have added code to track the exceptions, because I think that failures should not be ignored, they should be at the very least logged.

Also, this code doesn't work very well with completion: if there are retries waiting for their delay and you Complete() the block, it will immediately complete and the retries will be lost. If that's a problem for you, you will have to track outstanding reties and complete target when source completes and no retries are waiting.

Dupre answered 4/7, 2013 at 13:4 Comment(2)
Thank you for the excellent answer (especially, a note about completion), I'd upvote it twice or even thrice if I could. I've implemented a solution for correct (I hope) completion by using something like this: I have a set of messages being retried (with adding and removing), and a continuation for the source's completion that perform polling (like await Task.Delay(100)) and verifies whether the target block has input messages and a set of messages is empty. When both conditions are true, I just completes the target block. Could it be implemented without polling?Subtractive
@Alex You don't need polling, you need to check the conditions at the right time. But I think you would also need to know that there are no messages waiting to be processed (because some of them could fail) and I'm not sure how to do that reasonably.Dupre
B
3

In addition to svick's excellent answer, there are a couple of other options:

  1. You can use TransientFaultHandling.Core - just set MaxDegreeOfParallelism to Unbounded so the other messages can get through.
  2. You can modify the block output type to include failure indication and a retry count, and create a dataflow loop, passing a filter to LinkTo that examines whether another retry is necessary. This approach is more complex; you'd have to add a delay to your block if it is doing a retry, and add a TransformBlock to remove the failure/retry information for the rest of the mesh.
Blaylock answered 5/7, 2013 at 2:58 Comment(9)
If I set MaxDegreeOfParallelism to Unbounded and e.g. my buffer receives 20K input messages, and let's assume that a half of them will require retrying - I think the whole system will stuck. Am I correct?Subtractive
One more comment to the previous one: I think this is not very good idea to keep the block when it requires just waiting and potentially can process another message - please correct me if I'm wrong.Subtractive
@Alex: MaxDegreeOfParallelism is just a maximum, it won't cause starvation. I'm not sure what you mean about "keeping" the block; you usually keep the dataflow mesh around as long as there may be more data.Blaylock
@StephenCleary: as I understand when using 'retryPolicy.ExecuteAsync' method from the TransientFaultHandling.Core, it stops the execution inside the block and retries after a retry interval. That means, that if the block can process max 3 messages at the same time, and one of them is waiting for retrying, the block won't be able to get a new message instead of retrying until the retrying either fails or completes.Subtractive
@Alex: MaxDegreeOfParallelism is a setting on the block, so if you set it to 3 and then have one message waiting for retrying, the block is free to spin up another task to process the next message. P.S. I'm assuming that ExecuteAsync will behave intelligently (i.e., that it uses Task.Delay and not Thread.Sleep). If it's stupid (using Thread.Sleep) then the solution will still work but will be very inefficient.Blaylock
I've just tried this code snippet, where set MaxDegreeOfParallelism to 1, and the block didn't take anything while retrying the message, and the whole workflow stucks. That's what I tried to show a bit earlier (sorry for being not too clear).Subtractive
@Alex: Of course it would block the workflow if you set it to 1 (which is the default setting anyway). It's blocking because you told it to only do one at a time. Try setting it to Unbounded.Blaylock
Imagine a scenario when I set it to Unbounded, and the buffer receives 20K messages, a half of the messages potentially will enter the retry, and each message processing is CPU-heavy - the whole workflow will stuck?Subtractive
No, it will not get stuck.Blaylock
C
2

Here are two methods CreateRetryTransformBlock and CreateRetryActionBlock that operate under these assumptions:

  1. The caller wants all items to be processed, even if some of them have repeatedly failed.
  2. The caller is interested to know about all occured exceptions, even for items that finally succeeded (not applicable for the CreateRetryActionBlock).
  3. The caller may want to set an upper limit to the number of total retries, after which the block should transition to a faulted state.
  4. The caller wants to be able to set all available options of a normal block, including the MaxDegreeOfParallelism, BoundedCapacity, CancellationToken and EnsureOrdered, on top of the options related to the retry functionality.

The implementation below uses a SemaphoreSlim to control the level of concurrency between operations that are attempted for the first time, and previously faulted operations that are retried after their delay duration has elapsed.

public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The delay duration before retrying an item.</summary>
    public TimeSpan RetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public readonly struct RetryResult<TInput, TOutput>
{
    public readonly TInput Input { get; }
    public readonly TOutput Output { get; }
    public readonly bool Success { get; }
    public readonly Exception[] Exceptions { get; }

    public bool Failed => !Success;
    public Exception FirstException => Exceptions != null ? Exceptions[0] : null;
    public int Attempts =>
        Exceptions != null ? Exceptions.Length + (Success ? 1 : 0) : 1;

    public RetryResult(TInput input, TOutput output, bool success,
        Exception[] exceptions)
    {
        Input = input;
        Output = output;
        Success = success;
        Exceptions = exceptions;
    }
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static IPropagatorBlock<TInput, RetryResult<TInput, TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.RetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.RetryDelay));
    var cancellationToken = dataflowBlockOptions.CancellationToken;

    var exceptionsCount = 0;
    var semaphore = new SemaphoreSlim(
        dataflowBlockOptions.MaxDegreeOfParallelism);

    async Task<(TOutput, Exception)> ProcessOnceAsync(TInput item)
    {
        await semaphore.WaitAsync(); // Preserve the SynchronizationContext
        try
        {
            var result = await transform(item).ConfigureAwait(false);
            return (result, null);
        }
        catch (Exception ex)
        {
            if (maxRetriesTotal != -1)
            {
                if (Interlocked.Increment(ref exceptionsCount) > maxRetriesTotal)
                {
                    throw new RetryLimitException($"The max retry limit " +
                        $"({maxRetriesTotal}) has been reached.", ex);
                }
            }
            return (default, ex);
        }
        finally
        {
            semaphore.Release();
        }
    }

    async Task<Task<RetryResult<TInput, TOutput>>> ProcessWithRetryAsync(
        TInput item)
    {
        // Creates a two-stages operation. Preserves the context on every await.
        var (result, firstException) = await ProcessOnceAsync(item);
        if (firstException == null) return Task.FromResult(
            new RetryResult<TInput, TOutput>(item, result, true, null));
        return RetryStageAsync();

        async Task<RetryResult<TInput, TOutput>> RetryStageAsync()
        {
            var exceptions = new List<Exception>();
            exceptions.Add(firstException);
            for (int i = 2; i <= maxAttemptsPerItem; i++)
            {
                await Task.Delay(retryDelay, cancellationToken);
                var (result, exception) = await ProcessOnceAsync(item);
                if (exception != null)
                    exceptions.Add(exception);
                else
                    return new RetryResult<TInput, TOutput>(item, result,
                        true, exceptions.ToArray());
            }
            return new RetryResult<TInput, TOutput>(item, default, false,
                exceptions.ToArray());
        };
    }

    // The input block awaits the first stage of each operation
    var input = new TransformBlock<TInput, Task<RetryResult<TInput, TOutput>>>(
        item => ProcessWithRetryAsync(item), dataflowBlockOptions);

    // The output block awaits the second (and final) stage of each operation
    var output = new TransformBlock<Task<RetryResult<TInput, TOutput>>,
        RetryResult<TInput, TOutput>>(t => t, dataflowBlockOptions);

    input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });

    // In case of failure ensure that the input block is faulted too,
    // so that its input/output queues are emptied, and any pending
    // SendAsync operations are aborted
    PropagateFailure(output, input);

    return DataflowBlock.Encapsulate(input, output);

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex) { block2.Fault(ex); }
    }
}

public static ITargetBlock<TInput> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<RetryResult<TInput, object>>();
    block.LinkTo(nullTarget);
    return block;
}
Cyclamen answered 24/5, 2020 at 13:5 Comment(2)
For an implementation working with Stephen Cleary's elegant and lightweight Try type, instead of the somewhat bulky RetryResult type used here, look at this question.Cyclamen
Update: I am no longer satisfied with the above implementation, because of the unnatural way it prioritizes the retries of the repeated failures. My current opinion for the best implementation of a RetryTransformBlock is to create a pipeline of linked TransformBlocks with length equal to the number of the maximum retries, and have all messages pass through the pipeline. All TransformBlocks will have the same logic: process the message if it's not processed, or otherwise propagate it downstream along with its result unchanged. This configuration results to the most natural behavior IMHO.Cyclamen

© 2022 - 2024 — McMap. All rights reserved.