Continuation Tasks Hanging When Using LimitedConcu

2019-04-10 14:49发布

问题:

I've my working on using the TPL in C# (.NET 4.0).

I have created a custom API to ease the creation of web requests and downloading the contents (asynchronously, using continuation tasks). That part is working fine.

The problem I have occurs when I try to use the LimitedConcurrencyLevelTaskScheduler (found in the Samples for Parallel Programming and in the MSDN documentation for tasks) with deferred task creation. If you're not familiar with that class, all it does is limit the degree of concurrency of tasks scheduled to an arbitrary number.

Basically I want to defer the creation of web request task chains into a task being scheduled by the LimitedConcurrencyLevelTaskScheduler so that I can limit the number of concurrent downloads.

As suggested by the sage Stephen Toub, when deferring the creation of a Task, the best thing to do is to design your API to return a Func<Task> or Func<Task<TResult>>. I have done this.

Unfortunately, my program hangs after scheduling the first set of concurrent tasks. Say I have my tasks limited to 4 degrees of concurrency. In that case, 4 tasks would be started and then the program would hang. The tasks would never complete.

I have created a minimal example to illustrate the problem simply. I am using file reads instead of using a WebRequest. I have limited the degrees of concurrency to 1.

class Program
{
    static Func<Task> GetReadTask()
    {
        return () =>
        {
            Console.WriteLine("Opening file.");

            FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);

            byte[] buffer = new byte[32];

            Console.WriteLine("Beginning read.");
            return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
        };
    }

    static void Main()
    {
        LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
        TaskFactory factory = new TaskFactory(ts);

        int[] range = {1, 2, 3};

        var tasks = range.Select(number =>
        {
            Func<Task> getTask = GetReadTask();
            return factory.StartNew(() =>
            {
                var task = getTask();
                task.Wait();
            });
        });

        Task.WaitAll(tasks.ToArray());
    }
}

To clarify what I mean by "it hangs", this is what the output looks like.

Opening file.
Beginning read.

And then nothing else is printed... forever.

Any clues on what is going on?

回答1:

Good question!

Firstly, I'm not sure LimitedConcurrencyLevelTaskScheduler is the academically correct solution. In order for this to limit the number of concurrent requests to N, you have to block N tasks which kind of defeats the purpose of using APM async calls in the first place.

Having said that, it is a whole lot easier to implement than the alternative. You would need to have a work queue and keep count of the number of in flight requests, then create worker tasks as required. That's not trivial to get right and if the number N of concurrent requests will be small, having N blocked threads is not the end of the world.

So, the problem with your code is that tasks created within other tasks use the scheduler from the parent task. Actually that's not true for tasks created with FromAsync as these use the underlying APM implementation and so are a bit different.

You create tasks in Main with:

return factory.StartNew( () =>
    {
        var task = getTask();
        task.Wait();
    }
);

factory uses the LimitedConcurrencyLevelTaskScheduler( 1 ), so only 1 of these tasks can execute concurrently and that one is waiting on the task returned from getTask().

So, in GetReadTask you call Task<int>.Factory.FromAsync. This runs because FromAsync doesn't respect the parent task's scheduler.

Then you create a continuation with .ContinueWith(task => fileStream.Close()). This creates a task that does respect its parent's scheduler. Since the LimitedConcurrencyLevelTaskScheduler is already executing a task ( the one in Main that's blocked ) the continuation cannot run and you have a deadlock.

The solution is to run the continuation on a normal thread pool thread with TaskScheduler.Default. It then gets to run and the deadlock is broken.

Here's my solution:

static Task QueueReadTask( TaskScheduler ts, int number )
{
    Output.Write( "QueueReadTask( " + number + " )" );

    return Task.Factory.StartNew( () =>
        {
            Output.Write( "Opening file " + number + "." );

            FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );

            byte[] buffer = new byte[ 32 ];

            var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );

            var tClose = tRead.ContinueWith( task =>
                    {
                        Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
                        fileStream.Close();
                    }
                    , TaskScheduler.Default
                );

            tClose.Wait();
        }
        , CancellationToken.None
        , TaskCreationOptions.None
        , ts
    );
}

And Main now looks like this:

static void Main()
{
    LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );

    int[] range = { 1, 2, 3 };

    var tasks = range.Select( number =>
        {
            var task = QueueReadTask( ts, number );

            return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
        }
    )
    .ToArray();

    Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );

    Task.WaitAll( tasks );

    Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}

There are a couple of things to note:

Moving the task.Wait() into QueueReadTask makes it more obvious that you are blocking a task. You can remove the FromAsync call and the continuation and replace them with a normal synchronous call since you are blocking anyway.

The task returned from QueueReadTask can have continuations. By default, these run under the default scheduler because they inherit the parent task's scheduler not the antecedent's one. In this case there is no parent task, so the default scheduler is used.