I've my working on using the TPL in C# (.NET 4.0).
I have created a custom API to ease the creation of web requests and downloading the contents (asynchronously, using continuation tasks). That part is working fine.
The problem I have occurs when I try to use the LimitedConcurrencyLevelTaskScheduler
(found in the Samples for Parallel Programming and in the MSDN documentation for tasks) with deferred task creation. If you're not familiar with that class, all it does is limit the degree of concurrency of tasks scheduled to an arbitrary number.
Basically I want to defer the creation of web request task chains into a task being scheduled by the LimitedConcurrencyLevelTaskScheduler
so that I can limit the number of concurrent downloads.
As suggested by the sage Stephen Toub, when deferring the creation of a Task
, the best thing to do is to design your API to return a Func<Task>
or Func<Task<TResult>>
. I have done this.
Unfortunately, my program hangs after scheduling the first set of concurrent tasks. Say I have my tasks limited to 4 degrees of concurrency. In that case, 4 tasks would be started and then the program would hang. The tasks would never complete.
I have created a minimal example to illustrate the problem simply. I am using file reads instead of using a WebRequest
. I have limited the degrees of concurrency to 1.
class Program
{
static Func<Task> GetReadTask()
{
return () =>
{
Console.WriteLine("Opening file.");
FileStream fileStream = File.Open("C:\\Users\\Joel\\Desktop\\1.txt", FileMode.Open);
byte[] buffer = new byte[32];
Console.WriteLine("Beginning read.");
return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
};
}
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(ts);
int[] range = {1, 2, 3};
var tasks = range.Select(number =>
{
Func<Task> getTask = GetReadTask();
return factory.StartNew(() =>
{
var task = getTask();
task.Wait();
});
});
Task.WaitAll(tasks.ToArray());
}
}
To clarify what I mean by "it hangs", this is what the output looks like.
Opening file.
Beginning read.
And then nothing else is printed... forever.
Any clues on what is going on?
Good question!
Firstly, I'm not sure LimitedConcurrencyLevelTaskScheduler
is the academically correct solution. In order for this to limit the number of concurrent requests to N, you have to block N tasks which kind of defeats the purpose of using APM async calls in the first place.
Having said that, it is a whole lot easier to implement than the alternative. You would need to have a work queue and keep count of the number of in flight requests, then create worker tasks as required. That's not trivial to get right and if the number N of concurrent requests will be small, having N blocked threads is not the end of the world.
So, the problem with your code is that tasks created within other tasks use the scheduler from the parent task. Actually that's not true for tasks created with FromAsync
as these use the underlying APM implementation and so are a bit different.
You create tasks in Main
with:
return factory.StartNew( () =>
{
var task = getTask();
task.Wait();
}
);
factory
uses the LimitedConcurrencyLevelTaskScheduler( 1 )
, so only 1 of these tasks can execute concurrently and that one is waiting on the task returned from getTask()
.
So, in GetReadTask
you call Task<int>.Factory.FromAsync
. This runs because FromAsync
doesn't respect the parent task's scheduler.
Then you create a continuation with .ContinueWith(task => fileStream.Close())
. This creates a task that does respect its parent's scheduler. Since the LimitedConcurrencyLevelTaskScheduler
is already executing a task ( the one in Main
that's blocked ) the continuation cannot run and you have a deadlock.
The solution is to run the continuation on a normal thread pool thread with TaskScheduler.Default
. It then gets to run and the deadlock is broken.
Here's my solution:
static Task QueueReadTask( TaskScheduler ts, int number )
{
Output.Write( "QueueReadTask( " + number + " )" );
return Task.Factory.StartNew( () =>
{
Output.Write( "Opening file " + number + "." );
FileStream fileStream = File.Open( "D:\\1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );
byte[] buffer = new byte[ 32 ];
var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );
var tClose = tRead.ContinueWith( task =>
{
Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
fileStream.Close();
}
, TaskScheduler.Default
);
tClose.Wait();
}
, CancellationToken.None
, TaskCreationOptions.None
, ts
);
}
And Main
now looks like this:
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );
int[] range = { 1, 2, 3 };
var tasks = range.Select( number =>
{
var task = QueueReadTask( ts, number );
return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
}
)
.ToArray();
Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
Task.WaitAll( tasks );
Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}
There are a couple of things to note:
Moving the task.Wait()
into QueueReadTask
makes it more obvious that you are blocking a task. You can remove the FromAsync
call and the continuation and replace them with a normal synchronous call since you are blocking anyway.
The task returned from QueueReadTask
can have continuations. By default, these run under the default scheduler because they inherit the parent task's scheduler not the antecedent's one. In this case there is no parent task, so the default scheduler is used.