I need to have some kind of object that acts like a BroadcastBlock, but with guaranteed delivery. So i used an answer from this question. But i don't really clearly understand the execution flow here. I have a console app. Here is my code:
static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));
ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);
broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});
Task producer = Produce(broadcaster);
List<Task> ToWait = new List<Task>();
foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
ToWait.Add(producer);
Task.WaitAll(ToWait.ToArray());
Console.ReadLine();
}
static async Task Produce(ActionBlock<int> broadcaster)
{
for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);
broadcaster.Complete();
}
Each number must be handled sequentially, so i can't use MaxDegreeOfParallelism in broadcaster block. But all actionblocks that receive the number can run in parallel.
So here is the question:
In the output i can see different thread ids. Do i understand it correctly that works as follows:
Execution hits await block.SendAsync(num);
in a broadcaster.
If current block is not ready to accept the number, execution exits broadcaster and hangs at the Task.WaitAll.
When block accepts the number, the rest of foreach statement in broadcaster is executed in a threadpool.
And the same till the end.
Each iteration of foreach is executed in a threadpool. But actually it happens sequentially.
Am i right or wrong in my understanding? How can i change this code to send the number to all blocks asynchronously?
To make sure that if one of blocks is not ready to receive the number at the moment, i won't wait for it and all others that are ready will receive the number. And that all blocks can run in parallel. And guarantee delivery.