WhenAll on the large number of Task [closed]

2019-05-10 17:38发布

问题:

I need your help to find the best solution. This is my original code:

public async Task Test()
{
    var tasks = new List<Task>();
    string line;
    using (var streamReader = File.OpenText(InputPath))
    {
        while ((line = streamReader.ReadLine()) != null)
        {
            tasks.Add(Process(line));
        }
    }

    await Task.WhenAll(tasks.ToArray());
}

private Task Process(string line)
{
    return Task.Run(() =>
    {
        Console.WriteLine(line);
    });
}

It will read a file with lines and process each line by a task. However, if file has more 1 million lines, the array of tasks are bigger, this code is still good? or I should find another solution. Please help me. Thanks.

回答1:

That's a bad idea. That could launch way too many threads.

A much better way to do this is to simply use Parallel.ForEach() like so:

using System;
using System.IO;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            string filename = @"Your test filename goes here";
            Parallel.ForEach(File.ReadLines(filename), process);
        }

        private static void process(string line)
        {
            Console.WriteLine(line);
        }
    }
}

This doesn't use async/await, however. But you could wrap the entire call to Parallel.ForEach() in a task if you wanted.

Alternatively, if you want to use the Task Parallel Library (a Microsoft NuGet package) you can do something like this:

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            Task.Run(test).Wait();
        }

        static async Task test()
        {
            string filename = @"Your filename goes here";
            await processFile(filename);
        }

        static async Task processFile(string filename)
        {
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
            var action = new ActionBlock<string>(s => process(s), options);

            foreach (var line in File.ReadLines(filename))
                await action.SendAsync(line);

            action.Complete();

            await action.Completion;
        }

        static void process(string line)
        {
            Thread.Sleep(100);  // Simulate work.
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " " + line);
        }
    }
}

This gives you async support.


Addendum: A demonstration of threadpool throttling.

(This is in response to shay__'s comments.)

If you start a lot of long-running tasks where the task takes longer to run than a second or so, you may see threadpool throttling.

This happens if the number of threadpool threads for the current process equals or exceeds the worker count returned by a call to ThreadPool.GetMinThreads(out workers, out ports);.

If this happens, the launching of a new threadpool thread will be delayed by a short while (one second on my system) before a new threadpool thread is created. Often that will have allowed another threadpool thread to become available, and that will be used instead (which of course is a major reason for the throttling).

The following code demonstrates the issue:

int workers, ports;
ThreadPool.GetMinThreads(out workers, out ports);
Console.WriteLine("Min workers = " + workers); // Prints 8 on my system.
var sw = Stopwatch.StartNew();

for (int i = 0; i < 100; ++i)
{
    Task.Run(() =>
    {
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} started at time {sw.Elapsed}");
        Thread.Sleep(10000);
    });
}

Console.ReadLine();

On my system, this prints the following:

Min workers = 8
Thread 3 started at time 00:00:00.0098651
Thread 6 started at time 00:00:00.0098651
Thread 8 started at time 00:00:00.0099841
Thread 5 started at time 00:00:00.0099680
Thread 7 started at time 00:00:00.0099918
Thread 4 started at time 00:00:00.0098739
Thread 10 started at time 00:00:00.0100828
Thread 9 started at time 00:00:00.0101833
Thread 11 started at time 00:00:01.0096247
Thread 12 started at time 00:00:02.0098105
Thread 13 started at time 00:00:03.0099824
Thread 14 started at time 00:00:04.0100671
Thread 15 started at time 00:00:05.0098035
Thread 16 started at time 00:00:06.0099449
Thread 17 started at time 00:00:07.0096293
Thread 18 started at time 00:00:08.0106774
Thread 19 started at time 00:00:09.0098193
Thread 20 started at time 00:00:10.0104156
Thread 3 started at time 00:00:10.0109315
Thread 8 started at time 00:00:10.0112171
Thread 7 started at time 00:00:10.0112531
Thread 9 started at time 00:00:10.0117256
Thread 4 started at time 00:00:10.0117920
Thread 10 started at time 00:00:10.0117298
Thread 6 started at time 00:00:10.0109381
Thread 5 started at time 00:00:10.0112276
Thread 21 started at time 00:00:11.0095859
Thread 11 started at time 00:00:11.0101189
Thread 22 started at time 00:00:12.0095421
Thread 12 started at time 00:00:12.0111173
Thread 23 started at time 00:00:13.0095932    ...

Note how the first 8 threads start very quickly, but then new threads are throttled to around one per second, until the first batch of threads terminate and then can be reused.

Also note that this effect only occurs if the threads take a relatively long time to terminate.