C# Running many async tasks the same time

2019-03-05 05:11发布

问题:

I'm kinda new to async tasks.

I've a function that takes student ID and scrapes data from specific university website with the required ID.

    private static HttpClient client = new HttpClient();
    public static async Task<Student> ParseAsync(string departmentLink, int id, CancellationToken ct)
    {
        string website = string.Format(departmentLink, id);
        try
        {
            string data;
            var stream = await client.GetAsync(website, ct);
            using (var reader = new StreamReader(await stream.Content.ReadAsStreamAsync(), Encoding.GetEncoding("windows-1256")))
                data = reader.ReadToEnd();

            //Parse data here and return Student.
        } catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

And it works correctly. Sometimes though I need to run this function for a lot of students so I use the following

        for(int i = ids.first; i <= ids.last; i++)
        {
            tasks[i - ids.first] = ParseStudentData.ParseAsync(entity.Link, i, cts.Token).ContinueWith(t =>
            {
                Dispatcher.Invoke(() =>
                {
                    listview_students.Items.Add(t.Result);
                    //Students.Add(t.Result);
                    //lbl_count.Content = $"{listview_students.Items.Count}/{testerino.Length}";
                });
            });
        }

I'm storing tasks in an array to wait for them later.

This also works finely as long as the students count is between (0, ~600?) it's kinda random. And then for every other student that still hasn't been parsed throws A task was cancelled.

Keep in mind that, I never use the cancellation token at all.

I need to run this function on so many students it can reach ~9000 async task altogether. So what's happening?

回答1:

You are basically creating a denial of service attack on the website when you are queuing up 9000 requests in such a short time frame. Not only is this causing you errors, but it could take down the website. It would be best to limit the number of concurrent requests to a more reasonable value (say 30). While there are probably several ways to do this, one that comes to mind is the following:

private async Task Test()
{
  var tasks = new List<Task>();
  for (int i = ids.first; i <= ids.last; i++)
  {
    tasks.Add(/* Do stuff */);
    await WaitList(tasks, 30);
  }
}

private async Task WaitList(IList<Task> tasks, int maxSize)
{
  while (tasks.Count > maxSize)
  {
    var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
    tasks.Remove(completed);
  }
}

Other approaches might leverage the producer/consumer pattern using .Net classes such as a BlockingCollection



回答2:

This is what I ended up with based on @erdomke code:

    public static async Task ForEachParallel<T>(
      this IEnumerable<T> list, 
      Func<T, Task> action, 
      int dop)
    {
        var tasks = new List<Task>(dop);
        foreach (var item in list)
        {
            tasks.Add(action(item));

            while (tasks.Count >= dop)
            {
                var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
                tasks.Remove(completed);
            }
        }

        // Wait for all remaining tasks.
        await Task.WhenAll(tasks).ConfigureAwait(false);
    }

    // usage
    await Enumerable
        .Range(1, 500)
        .ForEachParallel(i => ProcessItem(i), Environment.ProcessorCount);