How can I re-write the code that the code completes when BOTH transformblocks completed? I thought completion means that it is marked complete AND the " out queue" is empty?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
I edited the code, adding an input buffer count for each transform block. Clearly all 100 items are streamed to each of the transform blocks. But as soon as one of the transformblocks finishes the processorblock does not accept any more items and instead the input buffer of the incomplete transformblock just flushes the input buffer.
The issue is exactly what casperOne said in his answer. Once the first transform block completes, the processor block goes into “finishing mode”: it will process remaining items in its input queue, but it won't accept any new items.
There is a simpler fix than splitting your processor block in two though: don't set
PropagateCompletion
, but instead set completion of the processor block manually when both transform blocks complete:Other answers are quite clear about why PropagateCompletion=true mess things up when a block has more than two sources.
To provide a simple solution to the problem, you may want to look at an open source library DataflowEx that solves this kind of problem with smarter completion rules built-in. (It uses TPL Dataflow linking internally but supports complex completion propagation. The implementation looks similiar to WhenAll but also handles the dynamic link adding. Please check Dataflow.RegisterDependency() and TaskEx.AwaitableWhenAll() for impl detail.)
I slightly changed your code to make everything work using DataflowEx:
Full code is here.
Disclaimer: I am the author of DataflowEx, which is published under MIT license.
An addition to svick's answer: to be consistent with the behaviour you get with the PropagateCompletion option, you also need to forward exceptions in case a preceding block faulted. An extension method like the following takes care of that as well:
The issue here is that you are setting the
PropagateCompletion
property each time you call theLinkTo
method to link the blocks and the different in wait times in your transformation blocks.From the documentation for the
Complete
method on theIDataflowBlock
interface (emphasis mine):Because you stagger out your wait times in each of the
TransformBlock<TInput, TOutput>
instances,transformBlock2
(waiting for 20 ms) is finished beforetransformBlock1
(waiting for 50 ms).transformBlock2
completes first, and then sends the signal toprocessorBlock
which then says "I'm not accepting anything else" (andtransformBlock1
hasn't produced all of its messages yet).Note that the processing of
transformBlock1
beforetransformBlock1
is not absolutely guaranteed; it's feasible that the thread pool (assuming you're using the default scheduler) will process the tasks in a different order (but more than likely will not, as it will steal work from the queues once the 20 ms items are done).Your pipeline looks like this:
In order to get around this, you want to have a pipeline that looks like this:
Which is accomplished by just creating two separate
ActionBlock<TInput>
instances, like so:You then need to wait on both processor blocks instead of just one:
A very important note here; when creating an
ActionBlock<TInput>
, the default is to have theMaxDegreeOfParallelism
property on theExecutionDataflowBlockOptions
instance passed to it set to one.This means that the calls to the
Action<T>
delegate that you pass to theActionBlock<TInput>
are thread-safe, only one will execute at a time.Because you now have two
ActionBlock<TInput>
instances pointing to the sameAction<T>
delegate, you aren't guaranteed thread-safety.If your method is thread-safe, then you don't have to do anything (which would allow you to set the
MaxDegreeOfParallelism
property toDataflowBlockOptions.Unbounded
, since there's no reason to block).If it's not thread-safe, and you need to guarantee it, you need to resort to traditional synchronization primitives, like the
lock
statement.In this case, you'd do it like so (although it's clearly not needed, as the
WriteLine
method on theConsole
class is thread-safe):