After playing around with dataflow I encountered a new problem. I would like to limit the inputqueue of all blocks. My producingblock (ActionBlock) is creating 5000 elements really fast and posts them to an broadcastblock. So if i set the BoundedCapacity of the broadcastblock to 100 he throws a lot of data away. But I would prefer the producingblock to wait for new slots in the inputqueue of my bufferblock.
Is there any way to get rid of this problem?
That's exactly what BufferBlock
is for. If you set its BoundedCapacity
and it gets full, it will postpone receiving any messages until someone consumes them. This means that for example Post()
will block and SendAsync()
will return an unfinished Task
.
EDIT: There is no built-in block that sends to multiple targets and never throws data away. But you can easily build one yourself from ActionBlock
and sending loop:
static ITargetBlock<T> CreateMultipleTargetsBlock<T>(
IEnumerable<ITargetBlock<T>> targets, int boundedCapacity)
{
var targetsList = targets.ToList();
var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targetsList)
{
await target.SendAsync(item);
}
},
new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity });
// TODO: propagate completion from block to targets
return block;
}
This code assumes that you don't need to clone the data for each target and that the list of targets never changes. Modifying the code for that should be fairly simple.