TPL Dataflow and exception handling in downstream

2019-07-14 06:04发布

I have the following pseudo code:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
    {
        await Task.Delay(500);
        Trace.TraceInformation(
            $"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
        // handling some logic but it throws
        if (item >= 5) throw new Exception("Something bad happened");

    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });

queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });

var targets = new List<ITargetBlock<int>> {queue};

var broadcaster = new ActionBlock<int>(
    async item =>
    {
        var processingTasks = targets.Select(async t =>
        {
            try
            {
                // This is condition is always false
                // t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
                if (!await t.SendAsync(item))
                    await t.Completion;
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Handled exception : " + e.Message);
            }
        });

        try
        {
            // Neither here the exception is rethrowed
            await Task.WhenAll(processingTasks);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
        }
    });

for (var i = 1; i <= 10; i++)
{
    broadcaster.Post(i);
}

The pipeline is configured like that ActionBlock<int> => BufferBlock<int> => ActionBlock<int>.

The last ActionBlock<int> throws an exception but it is not rethrown to source block where I would like to handle it.

How this code can be rewritten so it handles exceptions correctly?

1条回答
2楼-- · 2019-07-14 06:40

You can find the official guidelines for this topic here. Overall solution is to subscribe for all the blocks Completion task with checking the state of it, and, in case of need, replacing the faulted block (one should store all the references for the blocks too). Please refer to whole article to more information.

Behaviors of a network with Faulted blocks

  1. Reserved Messages
    In order to avoid message corruption, a faulted block should clear its message queues and move into a Faulted state as soon as possible. There is a single scenario that does not obey to this rule: a source block holding a message reserved by a target. If a block that encounters an internal exception has a message that was reserved by a target, the reserved message must not be dropped, and the block should not be moved into the Faulted state until the message is released or consumed.

  2. Hanging Networks
    ...

    • Keep a reference to all the blocks in the network and use Task.WaitAll or Task.WhenAll to wait for them (synchronously or asynchronously). If a block faults, its Completion task will complete in the Faulted state.
    • Use DataflowLinkOptions with PropagateCompletion == true when building a linear network. That will propagate block completion from source to target. In this case it is enough to wait on the network leaf block.
查看更多
登录 后发表回答