Teaser: guys, this question is not about how to implement retry policy. It's about correct completion of a TPL Dataflow block.
This question is mostly a continuation of my previous question Retry policy within ITargetBlock. The answer to this question was @svick's smart solution that utilizes TransformBlock
(source) and TransformManyBlock
(target). The only problem left is to complete this block in a right way: wait for all the retries to be completed first, and then complete the target block. Here is what I ended up with (it's just a snippet, don't pay too many attention to a non-threadsafe retries
set):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
while (target.InputCount > 0 || retries.Any())
await Task.Delay(100);
target.Complete();
});
The idea is to perform some kind of polling and verify whether there are still messages that waiting to be processed and there are no messages that require retrying. But in this solution I don't like the idea of polling.
Yes, I can encapsulate the logic of adding/removing retries into a separate class, and even e.g. perform some action when the set of retries becomes empty, but how to deal with target.InputCount > 0
condition? There is not such a callback that get called when there are no pending messages for the block, so it seems that verifying target.ItemCount
in a loop with a small delay is an only option.
Does anybody knows a smarter way to achieve this?
Maybe a ManualResetEvent can do the trick for you.
Add a public property to
TransformManyBlock
And here you go:
I am not sure where your
target.InputCount
is set. So at the place you changetarget.InputCount
you can add following code:Combining hwcverwe answer and JamieSee comment could be the ideal solution.
First, you need to create more than one event:
Then, you have to create an observer, and subscribe to the
TransformManyBlock
, so you are notified when a relevant event happens:The observable can be quite easy:
And you can wait for either the signal, or completion (exhaustion of all the source items), or both
You can inspect the result value of WaitAll to understand which event was set, and react accordingly. You can also add other events to the code, passing them to the observer, so that it can set them when needed. You can differentiate your behaviour and respond differently when an error is raised, for example