Run async method 8 times in parallel

2019-01-18 14:47发布

问题:

How do I turn the following into a Parallel.ForEach?

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    foreach (String url in threads)
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }
        progressBar1.PerformStep();
    }
}

I coded it in the assumption that asynchronous and parallel processing would be the same, and I just realized it isn't. I took a look at all the questions I could find on this, and I really can't seem to find an example that does it for me. Most of them lack readable variable names. Using single-letter variable names which don't explain what they contain is a horrible way to state an example.

I normally have between 300 and 2000 entries in the array named threads (Contains URL's to forum threads) and it would seem that parallel processing (Due to the many HTTP requests) would speed up the execution).

Do I have to remove all the asynchrony (I got nothing async outside the foreach, only variable definitions) before I can use Parallel.ForEach? How should I go about doing this? Can I do this without blocking the main thread?

I am using .NET 4.5 by the way.

回答1:

I coded it in the assumption that asynchronous and parallel processing would be the same

Asynchronous processing and parallel processing are quite different. If you don't understand the difference, I think you should first read more about it (for example what is the relation between Asynchronous and parallel programming in c#?).

Now, what you want to do is actually not that simple, because you want to process a big collection asynchronously, with a specific degree of parallelism (8). With synchronous processing, you could use Parallel.ForEach() (along with ParallelOptions to configure the degree of parallelism), but there is no simple alternative that would work with async.

In your code, this is complicated by the fact that you expect everything to execute on the UI thread. (Though ideally, you shouldn't access the UI directly from your computation. Instead, you should use IProgress, which would mean the code no longer has to execute on the UI thread.)

Probably the best way to do this in .Net 4.5 is to use TPL Dataflow. Its ActionBlock does exactly what you want, but it can be quite verbose (because it's more flexible than what you need). So it makes sense to create a helper method:

public static Task AsyncParallelForEach<T>(
    IEnumerable<T> source, Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}

In your case, you would use it like this:

await AsyncParallelForEach(
    threads, async url => await DownloadUrl(url), 8,
    TaskScheduler.FromCurrentSynchronizationContext());

Here, DownloadUrl() is an async Task method that processes a single URL (the body of your loop), 8 is the degree of parallelism (probably shouldn't be a literal constant in real code) and FromCurrentSynchronizationContext() makes sure the code executes on the UI thread.



回答2:

Stephen Toub has a good blog post on implementing a ForEachAsync. Svick's answer is quite good for platforms on which Dataflow is available.

Here's an alternative, using the partitioner from the TPL:

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task> body)
{
  var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism);
  var tasks = partitions.Select(async partition =>
  {
    using (partition) 
      while (partition.MoveNext()) 
        await body(partition.Current); 
  });
  return Task.WhenAll(tasks);
}

You can then use this as such:

public async Task getThreadContentsAsync(String[] threads)
{
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await threads.ForEachAsync(8, async url =>
  {
    HttpResponseMessage response = await client.GetAsync(url);
    String content = await response.Content.ReadAsStringAsync();
    String user;
    foreach (Match match in regex.Matches(content))
    {
      user = match.Groups[1].ToString();
      usernames.TryAdd(user, null);
    }
    progressBar1.PerformStep();
  });
}


回答3:

Yet another alternative is using SemaphoreSlim or AsyncSemaphore (which is included in my AsyncEx library and supports many more platforms than SemaphoreSlim):

public async Task getThreadContentsAsync(String[] threads)
{
  SemaphoreSlim semaphore = new SemaphoreSlim(8);
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await Task.WhenAll(threads.Select(async url =>
  {
    await semaphore.WaitAsync();
    try
    {
      HttpResponseMessage response = await client.GetAsync(url);
      String content = await response.Content.ReadAsStringAsync();
      String user;
      foreach (Match match in regex.Matches(content))
      {
        user = match.Groups[1].ToString();
        usernames.TryAdd(user, null);
      }
      progressBar1.PerformStep();
    }
    finally
    {
      semaphore.Release();
    }
  }));
}


回答4:

You can try the ParallelForEachAsync extension method from AsyncEnumerator NuGet Package:

using System.Collections.Async;

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    await threads.ParallelForEachAsync(async url =>
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }

        // THIS CALL MUST BE THREAD-SAFE!
        progressBar1.PerformStep();
    },
    maxDegreeOfParallelism: 8);
}