Random tasks from Task.Factory.StartNew never fini

2019-09-11 06:41发布

问题:

I am using Async await with Task.Factory method.

public async Task<JobDto> ProcessJob(JobDto jobTask)
{
    try
    {
        var T = Task.Factory.StartNew(() =>
        {
            JobWorker jobWorker = new JobWorker();
            jobWorker.Execute(jobTask);
        });

        await T;
    }

This method I am calling inside a loop like this

for(int i=0; i < jobList.Count(); i++)
{
    tasks[i] = ProcessJob(jobList[i]);
}

What I notice is that new tasks opens up inside Process explorer and they also start working (based on log file). however out of 10 sometimes 8 or sometimes 7 finishes. Rest of them just never come back.

  1. why would that be happening ?
  2. Are they timing out ? Where can I set timeout for my tasks ?

UPDATE

Basically above, I would like each Task to start running as soon as they are called and wait for the response on AWAIT T keyword. I am assuming here that once they finish each of them will come back at Await T and do the next action. I am alraedy seeing this result for 7 out of 10 tasks but 3 of them are not coming back.

Thanks

回答1:

It is hard to say what the issues is without the rest if the code, but you code can be simplified by making ProcessJob synchronous and then calling Task.Run with it.

public JobDto ProcessJob(JobDto jobTask)
{
    JobWorker jobWorker = new JobWorker();
    return jobWorker.Execute(jobTask);
}

Start tasks and wait for all tasks to finish. Prefer using Task.Run rather than Task.Factory.StartNew as it provides more favourable defaults for pushing work to the background. See here.

for(int i=0; i < jobList.Count(); i++)
{
    tasks[i] = Task.Run(() => ProcessJob(jobList[i]));
}

try
{
   await Task.WhenAll(tasks);
}
catch(Exception ex)
{
   // handle exception
}


回答2:

It's possible your code is swallowing exceptions. I would add a ContineWith call to the end of the part of the code that starts the new task. Something like this untested code:

var T = Task.Factory.StartNew(() =>
        {
            JobWorker jobWorker = new JobWorker();
            jobWorker.Execute(jobTask);
        }).ContinueWith(tsk =>
        {
            var flattenedException = tsk.Exception.Flatten();
            Console.Log("Exception! " + flattenedException);
            return true;
         });
        },TaskContinuationOptions.OnlyOnFaulted);  //Only call if task is faulted

Another possibility is that something in one of the tasks is timing out (like you mentioned) or deadlocking. To track down whether a timeout (or maybe deadlock) is the root cause, you could add some timeout logic (as described in this SO answer):

int timeout = 1000; //set to something much greater than the time it should take your task to complete (at least for testing)
var task = TheMethodWhichWrapsYourAsyncLogic(cancellationToken);
if (await Task.WhenAny(task, Task.Delay(timeout, cancellationToken)) == task)
{
    // Task completed within timeout.
    // Consider that the task may have faulted or been canceled.
    // We re-await the task so that any exceptions/cancellation is rethrown.
    await task;

}
else
{
    // timeout/cancellation logic
}

Check out the documentation on exception handling in the TPL on MSDN.



回答3:

First, let's make a reproducible version of your code. This is NOT the best way to achieve what you are doing, but to show you what is happening in your code!

I'll keep the code almost same as your code, except I'll use simple int rather than your JobDto and on completion of the job Execute() I'll write in a file that we can verify later. Here's the code

public class SomeMainClass
{
    public void StartProcessing()
    {
        var jobList = Enumerable.Range(1, 10).ToArray();
        var tasks = new Task[10];
        //[1] start 10 jobs, one-by-one
        for (int i = 0; i < jobList.Count(); i++)
        {
            tasks[i] = ProcessJob(jobList[i]);
        }
        //[4] here we have 10 awaitable Task in tasks
        //[5] do all other unrelated operations
        Thread.Sleep(1500); //assume it works for 1.5 sec
        // Task.WaitAll(tasks); //[6] wait for tasks to complete
        // The PROCESS IS COMPLETE here
    }

    public async Task ProcessJob(int jobTask)
    {
        try
        {
            //[2] start job in a ThreadPool, Background thread
            var T = Task.Factory.StartNew(() =>
            {
                JobWorker jobWorker = new JobWorker();
                jobWorker.Execute(jobTask);
            });
            //[3] await here will keep context of calling thread
            await T; //... and release the calling thread
        }
        catch (Exception) { /*handle*/ }
    }
}

public class JobWorker
{
    static object locker = new object();
    const string _file = @"C:\YourDirectory\out.txt";
    public void Execute(int jobTask) //on complete, writes in file
    {
        Thread.Sleep(500); //let's assume does something for 0.5 sec
        lock(locker)
        {
            File.AppendAllText(_file, 
                Environment.NewLine + "Writing the value-" + jobTask);
        }
    }
}

After running just the StartProcessing(), this is what I get in the file

Writing the value-4
Writing the value-2
Writing the value-3
Writing the value-1
Writing the value-6
Writing the value-7
Writing the value-8
Writing the value-5

So, 8/10 jobs has completed. Obviously, every time you run this, the number and order might change. But, the point is, all the jobs did not complete!

Now, if I un-comment the step [6] Task.WaitAll(tasks);, this is what I get in my file

Writing the value-2
Writing the value-3
Writing the value-4
Writing the value-1
Writing the value-5
Writing the value-7
Writing the value-8
Writing the value-6
Writing the value-9
Writing the value-10

So, all my jobs completed here!

Why the code is behaving like this, is already explained in the code-comments. The main thing to note is, your tasks run in ThreadPool based Background threads. So, if you do not wait for them, they will be killed when the MAIN process ends and the main thread exits!!

If you still don't want to await the tasks there, you can return the list of tasks from this first method and await the tasks at the very end of the process, something like this

public Task[] StartProcessing()
{
    ...
    for (int i = 0; i < jobList.Count(); i++)
    {
        tasks[i] = ProcessJob(jobList[i]);
    }
    ...
    return tasks;
}

//in the MAIN METHOD of your application/process
var tasks = new SomeMainClass().StartProcessing();
// do all other stuffs here, and just at the end of process
Task.WaitAll(tasks);

Hope this clears all confusion.