Why tasks are not going in parallel?

2019-08-02 15:57发布

问题:

I have number of locations(called Cells) where I run tests. Tests are implemented as asynchronous tasks and running consequently. User can select to run any tests for each cell. If I select to run same exactly same tests on all cells, then it's going more or less parallel.

Having tests A, B, C, if on cell 1 and 2 I select test A, B and on 3 I select only C, then for some reason tests in cell 1 and 2 will start running, but in cell 3 test C will not start, until A and B tests in cell 1 and 2 will not finished. Basically all tests in all cells are tend to run in the same order. That is not what I wanted. What I tried to achieve is the chain of tests to run independently from each cell. Now I will show how I implemented.

private async void buttonStartTest_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(Tests.Prepare), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    var blockFinalize = CreateExceptionCatchingActionBlock(new Func<Cell, Task>(Tests.Finalize), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    List<IPropagatorBlock<Cell, Cell>> blockList = new List<IPropagatorBlock<Cell, Cell>>();
    var funcs = tests.Select(x => x.Value);
    foreach (var func in funcs)
    {
        var blockNew = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(func), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 10000,
            MaxDegreeOfParallelism = 40,
        });
        blockList.Add(blockNew);
    }

    // link
    for (int i = 0; i < blockList.Count - 1; i++)
    {
        var b1 = blockList[i];
        var b2 = blockList[i + 1];
        b1.LinkTo(b2);
    }

    // link first and last
    blockPrepare.LinkTo(blockList[0], new DataflowLinkOptions { PropagateCompletion = true });
    blockList[blockList.Count - 1].LinkTo(blockFinalize, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        c.Reset();
        c.State = Cell.States.InProgress;
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockFinalize.Completion;
    }
    catch (Exception ex)
    {
        logger.Debug(ex.InnerException.InnerException.Message);
    }
}

Above you can see 2 mandatory blocks for each cell - prepare and finalize. Here is how I create them:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
                Func<TInput, Task<TOutput>> transform,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));

            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}

public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                Func<TInput, Task> action,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new ActionBlock<TInput>(async input =>
    {
        try
        {
            await action(input);
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));
        }
    }, dataflowBlockOptions);
}

Test itself look like this:

public static async Task<Cell> TestDoorsAsync(Cell c)
{
    int thisTestID = TEST_DOORS;
    TestConfiguration conf = c.GetConfiguration(thisTestID);
    if (conf.Enabled)
    {
       ... // execute test
    }
    else
    {
       // report that test was skipped due to user configuration
    }

    return c;
}

So is there some option that I missed or software design is wrong, which is preventing tests in cells to run without waiting for the tests to complete in other cells?

UPDATE

The repo is minimal console app demonstrating the issue.

There are still 3 cells and 3 tests(tasks). On cell 1, 2 I am selecting to run all tests, while on cell 3 only test 3. What I expect is right after preparation task for cell 3, to immediately see skipped tests 1, 2 and running test 3.

What I see is (# - cell number)

#1 Preparing...
#2 Preparing...
#3 Preparing...

#1 Test1 running...
#2 Test1 running...
#3 Test1 skipped
#1 Test2 running...
#2 Test2 running...
#3 Test2 skipped
#1 Test3 running...
#2 Test3 running...
#3 Test3 running...

#2 Finalizing...
#1 Finalizing...
#3 Finalizing...

tests in cell 3 synchronized with tests in cell 1 and 2. All tests finished at the same time, while in cell 3 the single test should have been finished earlier than in other cells.

回答1:

Thanks for the edit. Add EnsureOrdered = false to block options. What's happening is that your TransfomrBlocks are not passing the cells along until they are all done processing, so they can maintain your order. This is default and usually preferable but not in your case.

Looks like I was wrong when I commented that their was nothing wrong in the current code.



回答2:

It's hard to say it for sure, but there are two flaws I can see in your code for sure:

  1. You're not propagating the completion between transform blocks in your list
  2. You're using the blocking synchronous method to deliver the message: .Post instead of SendAsync, which is clearly is what you need to get an async flow here. So the last ones had to wait before first ones will finish.

Also, you need to understand that using BoundedCapacity introduces throttling into your pipeline, so you should check the buffer sizes, maybe a lot of threads just waits for a place in queue to become available.

Yet another thing you can try is to level the DataflowBlockOptions.MaxMessagesPerTask property. This property is used for cases when one greedy block proceeds with execution very quick, and handles more and more messages, without letting other blocks to do their job. Internally, each block has a Task, in which the handling is being done, and the default is -1, which indicates an unlimited number of messages. By setting this to some positive number, you force the block to restart it's internal task and provide some space to other ones.

For more advanced tips, please, refer to official docs.