TPL Complete vs Completion

2019-01-19 07:39发布

问题:

I need to import customer related data from legacy DB and perform several transformations during the process. This means a single entry needs to perform additional "events" (synchronize products, create invoices, etc.).

My initial solution was a simple parallel approach. It works okay, but sometimes it has issues. If the currently processed customers need to wait for the same type of events, their processing queues might got stuck and eventually time out, causing every underlying events to fail too (they depend on the one which failed). It doesn't happen all the time, yet it's annoying.

So I got another idea, work in batches. I mean not only limiting the number of customers being processed at the same time, but also the number of the events which are broadcasted to the queues. While searching around for ideas, I found this answer, which points to the TPL DataFlow.

I made a skeleton to get familiar with it. I set up a simple pipeline, but I'm a bit confused about the usage of Complete() and awaiting Completion().

The steps are the following

  1. Make a list of numbers (the ids of the customers to be imported) - this is outside the import logic, it just there to be able to trigger the rest of the logic
  2. Create a BatchBlock (to be able to limit the number of customers to be processed at the same time)
  3. Create a single MyClass1 item based on the id (TransformBlock<int, MyClass1>)
  4. Perform some logic and generate a collection of MyClass2 (TransformManyBlock<MyClass1, MyClass2>) - as example, sleep for 1 second
  5. Perform some logic on every item of the collection (ActionBlock<MyClass2>) - as example, sleep for 1 second

Here's the full code:

public static class Program
{
    private static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(2);
        for (var i = 1; i < 10; i++)
        {
            batchBlock.Post(i);
        }


        batchBlock.Complete();
        while (batchBlock.TryReceive(null, out var ids))
        {
            var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
            {
                Console.WriteLine($"TransformBlock(id: {id})");
                return new MyClass1(id, "Star Wars");
            });
            var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
            {
                Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
                Thread.Sleep(1000);
                return GetMyClass22Values(myClass1);
            });

            var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
            {
                Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
                Thread.Sleep(1000);
            });
            transformBlock.LinkTo(transformManyBlock);
            transformManyBlock.LinkTo(actionBlock);
            foreach (var id in ids)
            {
                transformBlock.Post(id);
            }

            // this is the point when I'm not 100% sure

            //transformBlock.Complete();
            //transformManyBlock.Complete();
            //transformManyBlock.Completion.Wait();
            actionBlock.Complete();
            actionBlock.Completion.Wait();
        }

        Console.WriteLine();
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }

    private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
    {
        return new List<MyClass2>
               {
                   new MyClass2(1, myClass1.Id+ " did this"),
                   new MyClass2(2, myClass1.Id+ " did that"),
                   new MyClass2(3, myClass1.Id+ " did this again")
               };
    }
}

public class MyClass1
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

public class MyClass2
{
    public MyClass1(int id, string value)
    {
        Id = id;
        Value = value;
    }

    public int Id { get; set; }

    public string Value { get; set; }
}

So the point I struggle with is the end, where I'd need to call Complete() or wait for Completion. I can't seem to find the right combination. I'd like to see an output as follows:

TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)

[the rest of the items]


Press any key to exit...   

Anyone can point me to the right direction?

回答1:

You're almost there, you need to call Complete on the first block in the pipeline then await Completion on the last block. Then in your links you need to propagate completion like this:

private async static void Main(string[] args) {
    var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
    {
        Console.WriteLine($"TransformBlock(id: {id})");
        return new MyClass1(id, "Star Wars");
    });
    var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
    {
        Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
        Thread.Sleep(1000);
        return GetMyClass22Values(myClass1);
    });

    var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
    {
        Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
        Thread.Sleep(1000);
    });

    //propagate completion
    transformBlock.LinkTo(transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
    transformManyBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true});
    foreach(var id in ids) {
        transformBlock.Post(id);
    }


    //Complete the first block
    transformBlock.Complete();

    //wait for completion to flow to the last block
    await actionBlock.Completion;
} 

You can also incorporate the batch block into your pipeline and remove the need for the TryRecieve call but that seems like another part of your flow.

Edit

Example of propagating completion to multiple blocks:

public async static void Main(string[] args) {

    var sourceBlock = new BufferBlock<int>();

    var processBlock1 = new ActionBlock<int>(i => Console.WriteLine($"Block1 {i}"));

    var processBlock2 = new ActionBlock<int>(i => Console.WriteLine($"Block2 {i}"));

    sourceBlock.LinkTo(processBlock1);
    sourceBlock.LinkTo(processBlock2);

    var sourceBlockCompletion = sourceBlock.Completion.ContinueWith(tsk => {
        if(!tsk.IsFaulted) {
            processBlock1.Complete();
            processBlock2.Complete();
        } else {
            ((IDataflowBlock)processBlock1).Fault(tsk.Exception);
            ((IDataflowBlock)processBlock2).Fault(tsk.Exception);
        }
    });

    //Send some data...

    sourceBlock.Complete();
    await Task.WhenAll(sourceBlockCompletion, processBlock1.Completion, processBlock2.Completion);
}