I have the following pseudo code:
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
{
await Task.Delay(500);
Trace.TraceInformation(
$"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
// handling some logic but it throws
if (item >= 5) throw new Exception("Something bad happened");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });
var targets = new List<ITargetBlock<int>> {queue};
var broadcaster = new ActionBlock<int>(
async item =>
{
var processingTasks = targets.Select(async t =>
{
try
{
// This is condition is always false
// t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
if (!await t.SendAsync(item))
await t.Completion;
}
catch (Exception e)
{
Trace.TraceInformation("Handled exception : " + e.Message);
}
});
try
{
// Neither here the exception is rethrowed
await Task.WhenAll(processingTasks);
}
catch (Exception e)
{
Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
}
});
for (var i = 1; i <= 10; i++)
{
broadcaster.Post(i);
}
The pipeline is configured like that ActionBlock<int> => BufferBlock<int> => ActionBlock<int>
.
The last ActionBlock<int>
throws an exception but it is not rethrown to source block where I would like to handle it.
How this code can be rewritten so it handles exceptions correctly?
You can find the official guidelines for this topic here. Overall solution is to subscribe for all the blocks
Completion
task with checking the state of it, and, in case of need, replacing the faulted block (one should store all the references for the blocks too). Please refer to whole article to more information.