-->

TPL Dataflow duplicate message to all consumers

2020-07-22 17:46发布

问题:

I'm currently writing an application using WPF and TPL Dataflow that should do the following:

  1. Load all files in a directory
  2. Once it starts processing, log something to the ui and process each file
  3. Once completed log something to the ui

The problem is that the logging to the UI needs to happen in the UI thread and only log just before it starts processing.

The only way I've been able to do this now is by manually calling the dispatcher from inside the TPL Transform block and updating the UI:

Application.Current.Dispatcher.Invoke(new Action(() =>
{
    ProcessedFiles.Add(optimizedFileResult);
}));

I would like to do this through a DataFlow block though which is running on the UI thread using:

ExecutionDataflowBlockOptions.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

However if I set this on the block where the optimization is happening, the optimizations will also run single threaded.

If, on the other hand, I'd create a new block before the Processing block and call it there. It'll start saying "processing" way before it actually starts.

Sample code

I created some sample code to reproduce this issue:

public class TplLoggingToUiIssue
    {
        public TplLoggingToUiIssue()
        {

        }

        public IEnumerable<string> RecurseFiles()
        {
            for (int i = 0; i < 20; i++)
            {
                yield return i.ToString();
            }
        }

        public async Task Go()
        {
            var block1 = new TransformBlock<string, string>(input =>
            {
                Console.WriteLine($"1: {input}");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity = 10,
                EnsureOrdered = false
            });

            var block2 = new TransformBlock<string, string>(input =>
            {
                Console.WriteLine($"2: {input}\t\t\tStarting {input} now (ui logging)");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                //TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), (Doesn't work in Console app, but you get the idea)
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1,
                EnsureOrdered = false
            });


            var block3 = new TransformBlock<string, string>(async input =>
            {
                Console.WriteLine($"3 start: {input}");
                await Task.Delay(5000);
                Console.WriteLine($"3 end: {input}");
                return input;
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 2,
                BoundedCapacity = 10,
                EnsureOrdered = false
            });

            var block4 = new ActionBlock<string>(input =>
            {
                Console.WriteLine($"4: {input}");
            }, new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1,
                EnsureOrdered = false
            });


            block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
            block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
            block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });


            var files = RecurseFiles();
            await Task.Run(async () =>
            {
                foreach (var file in files)
                {
                    Console.WriteLine($"Posting: {file}");
                    var result = await block1.SendAsync(file);

                    if (!result)
                    {
                        Console.WriteLine("Result is false!!!");
                    }
                }
            });

            Console.WriteLine("Completing");
            block1.Complete();
            await block4.Completion;
            Console.WriteLine("Done");
        }
    }

If you run this sample (with only 6 'files'), you'll get the following output:

Posting: 0
Posting: 1
Posting: 2
Posting: 3
Posting: 4
Posting: 5
1: 2
1: 1
1: 3
1: 0
1: 4
1: 5
2: 2                    Starting 2 now (ui logging)
Completing
3 start: 2
2: 0                    Starting 0 now (ui logging)
3 start: 0
2: 3                    Starting 3 now (ui logging)
2: 1                    Starting 1 now (ui logging)
2: 4                    Starting 4 now (ui logging)
2: 5                    Starting 5 now (ui logging)
3 end: 2
3 end: 0
3 start: 3
3 start: 1
4: 2
4: 0
3 end: 3
3 end: 1
4: 3
3 start: 4
3 start: 5
4: 1
3 end: 5
3 end: 4
4: 5
4: 4
Done

As can be seen in this output, the logging of it starting happens way too early. I also played around with using a Broadcast block instead, but that overwrites the values so that they get lost.

The ideal situation would be is to somehow have the logging block wait untill the processing block has capacity and then push one item through.

回答1:

There are several way to deal with this, as seen in the other answer(s). I'd like to point out an alternative: using a Progress<T> for that. Although it is designed to work best with Tasks it does work for Dataflow as well, like so:

        private void Form1_Load(object sender, EventArgs e)
        {
            var progressReporter = new Progress<string>();
            progressReporter.ProgressChanged += (reporter, message) => label1.Text = message;

            var b1 = new ActionBlock<string>((input) =>
            {
                ((IProgress<string>)progressReporter).Report(input);
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 10
            }); 

            b1.Post("a");
            b1.Post("b");
            b1.Post("c");
            b1.Post("d");
        }

Overall to me this looks like a clean alternative without having to add some plumbing to the individual blocks.

More info can be found in this excellent blogpost



回答2:

Here is one a bit contrived approach, that enhances with started-finished events the async lambda passed as an argument to an ActionBlock.

public static Func<TInput, Task> Enhance<TInput>(
    Func<TInput, Task> action,
    Action<TInput> onActionStarted = null,
    Action<TInput> onActionFinished = null,
    ISynchronizeInvoke synchronizingObject = null)
{
    return async (item) =>
    {
        RaiseEvent(onActionStarted, item, synchronizingObject);
        await action(item).ConfigureAwait(false);
        RaiseEvent(onActionFinished, item, synchronizingObject);
    };
}

private static void RaiseEvent<T>(Action<T> onEvent, T arg1,
    ISynchronizeInvoke synchronizingObject)
{
    if (onEvent == null) return;
    if (synchronizingObject != null && synchronizingObject.InvokeRequired)
    {
        synchronizingObject.Invoke(onEvent, new object[] { arg1 });
    }
    else
    {
        onEvent(arg1);
    }
}

Usage example:

private void Form_Load(object sender, EventArgs e)
{
    var block = new ActionBlock<string>(Enhance<string>(async item =>
    {
        await Task.Delay(5000); // Simulate some lengthy asynchronous job
    }, onActionStarted: item =>
    {
        this.Text = $"{item} started";
    }, onActionFinished: item =>
    {
        ListBoxCompleted.Items.Add(item);
    }, synchronizingObject: this), new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 2,
        BoundedCapacity = 10,
        EnsureOrdered = false
    });
}

The onActionStarted and onActionFinished callbacks will be invoked once for each processed item, in the UI thread.