I would be glad for some input on the following implementation of a BroadcastCopyBlock
in TPL Dataflow, which copies a received message to all consumers, that registered to the BroadcastCopyBlock
and guarantees delivery to all consumers, which are linked to the block at the time it receives the message. (Unlike the BroadcastBlock
which does not guarntee delivery of messages, if the next one comes in, before the former message has been delivered to all consumers).
My main concern is the reserving of messages and releasing of reservations. What would happen, if a receiving block decides to not handle the message? My understanding is, this would create a memory leak, since the message would be kept indefinitely. I'm thinking, that I should somehow mark the message as unused, but I'm not sure, how. I was thinking about some artificial message sink (an ActionBlock
with no action), or can I just mark a message as discarded?
Further Input on the implementation is also appreciated.
This is probably almost a duplicate of the following question, but I would prefer to use my own class, instead of a method to create the block. Or would that be considered bad style?
BroadcastBlock with Guaranteed Delivery in TPL Dataflow
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();
public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));
In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}
/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;
var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}
private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}
/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}
/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}
/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}
/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}
/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}
TL/DR
Your implementation uses the
Post
method inside theActionBlock
, which still will lose the data if target rejects the message, switch to theSendAsync
one, and, probably, you don't need to implenment all these methods, you need onlyITargetBlock<in TInput>
interface implementation.I want to clarify something before coming back to your main question. I think that you are confused by some options from
TPL Dataflow
library, and I want explain them a bit here. The behavior you're sayingThe first consumer, which receives the message, deletes it from the queue
is not about theBroadcastBlock
, it is about the multiple consumers linked for anISourceBlock
, likeBufferBlock
:What the
BroadcastBlock
do is exactly what are you talking about, consider this code:The output will be
However, this can be done only is the speed of incoming data is less than the speed of processing the data, because in other case your memory will end up quickly because of buffers grow, as you stated in your question. Let's see what will happen if we use the
ExecutionDataflowBlockOptions
for limit the incoming data buffer for a slow block:The output will be
As you can see, our slow block lost the last message, which is not what we are looking for. The reason for this is that the
BroadcastBlock
, by default, uses thePost
method to deliver messages. According official Intro Document:So, this method could help us in our mission, let's introduce some wrapper
ActionBlock
, which do exactly what we want -SendAsync
the data for our real processors:The output will be
But this waiting will never end - our basic wrapper does not propagate the completion for linked blocks, and the
ActionBlock
can't be linked to anything. We can try to wait for an wrapper completion:The output will be
Which is definitely not what we wanted -
ActionBlock
finished all the job, and the posting for a last message wouldn't be awaited for. Moreover, we don't even see the second message because we exit the method beforeSleep
method ends! So you definitely need your own implementation for this.Now, at last, some thoughts about your code:
ITargetBlock<in TInput>
, so implement only that interface.Post
method inside theActionBlock
, which, as we saw, could lead to data loss in case of some problems on consumer's side. ConsiderSendAsync
method instead.Completion
task actually reverses the order of your dataflow - you are waiting for targets to complete, which, as I think, is not good practice - you probably should create an ending block for your dataflow (this could be evenNullTarget
block, which simply synchronously drops the incoming message), and wait for it to be completed.