Retry policy within ITargetBlock

2019-02-05 00:18发布

I need to introduce a retry policy to the workflow. Let's say there are 3 blocks that are connected in such a way:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

So there is a buffer which accumulates data, then send it to the transform block that processes not more that 3 items at one time, and then the result send to the action block.

Potentially during processing the transform block transient errors are possible, and I want retry the block if the error is transient for several times.

I know that blocks generally are not retryable (delegates that passed into the blocks could be made retryable). And one of the options is to wrap the delegate passed to support retrying.

I also know that there is a very good library TransientFaultHandling.Core that provides the retry mechanisms to transient faults. This is an excellent library but not in my case. If I wrap the delegate that is passed to the transform block into the RetryPolicy.ExecuteAsync method, the message inside the transform block will be locked, and until retry either completes or fails, the transform block won't be able to receive a new message. Imagine, if all the 3 messages are entered into the retrying (let's say, the next retry attempt will be in 2 minutes) and fail, the transform block will be stuck until at least one message leave the transform block.

The only solution I see is to extend the TranformBlock (actually, ITargetBlock will be enough too), and do the retry manually (like from here):

do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

i.g. to put the message inside the transform block again with a delay, but in this case the retry context (number of retries left, etc.) also should be passed into this block. Sounds too complex...

Does anyone see a simpler approach to implement retry policy for a workflow block?

2条回答
霸刀☆藐视天下
2楼-- · 2019-02-05 00:30

I think you pretty much have to do that, you have to track the remaining number of retries for a message and you have to schedule the retried attempt somehow.

But you could make this better by encapsulating it in a separate method. Something like:

// it's a private class, so public fields are okay
private class RetryingMessage<T>
{
    public T Data;
    public int RetriesRemaining;
    public readonly List<Exception> Exceptions = new List<Exception>();
}

public static IPropagatorBlock<TInput, TOutput>
    CreateRetryingBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform, int numberOfRetries,
    TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler)
{
    var source = new TransformBlock<TInput, RetryingMessage<TInput>>(
        input => new RetryingMessage<TInput>
        { Data = input, RetriesRemaining = numberOfRetries });

    // TransformManyBlock, so that we can propagate zero results on failure
    TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null;
    target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>(
        async message =>
        {
            try
            {
                return new[] { await transform(message.Data) };
            }
            catch (Exception ex)
            {
                message.Exceptions.Add(ex);
                if (message.RetriesRemaining == 0)
                {
                    failureHandler(message.Exceptions);
                }
                else
                {
                    message.RetriesRemaining--;
                    Task.Delay(retryDelay)
                        .ContinueWith(_ => target.Post(message));
                }
                return null;
            }
        });

    source.LinkTo(
        target, new DataflowLinkOptions { PropagateCompletion = true });

    return DataflowBlock.Encapsulate(source, target);
}

I have added code to track the exceptions, because I think that failures should not be ignored, they should be at the very least logged.

Also, this code doesn't work very well with completion: if there are retries waiting for their delay and you Complete() the block, it will immediately complete and the retries will be lost. If that's a problem for you, you will have to track outstanding reties and complete target when source completes and no retries are waiting.

查看更多
【Aperson】
3楼-- · 2019-02-05 00:51

In addition to svick's excellent answer, there are a couple of other options:

  1. You can use TransientFaultHandling.Core - just set MaxDegreeOfParallelism to Unbounded so the other messages can get through.
  2. You can modify the block output type to include failure indication and a retry count, and create a dataflow loop, passing a filter to LinkTo that examines whether another retry is necessary. This approach is more complex; you'd have to add a delay to your block if it is doing a retry, and add a TransformBlock to remove the failure/retry information for the rest of the mesh.
查看更多
登录 后发表回答