-->

Parallel request to scrape multiple pages of a web

2019-06-09 10:50发布

问题:

I want to scrape a website with plenty of pages with interesting data but as the source is very large I want to multithread and limit the overload. I use a Parallel.ForEach to start each chunk of 10 tasks and I wait in the main for loop until the numbers of active threads started drop below a threshold. For that I use a counter of active threads I increment when starting a new thread with a WebClient and decrement when the DownloadStringCompleted event of the WebClient is triggered.

Originally the questions was how to use DownloadStringTaskAsync instead of DownloadString and wait that each of the threads started in the Parallel.ForEach has completed. This has been solved with a workaround: a counter (activeThreads) and a Thread.Sleep in the main foor loop.

Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?

And to get back to the original question, is there a way to do this more elegantly using TPL without the workaround of involving a counter ?

private static volatile int activeThreads = 0;

public static void RecordData()
{
  var nbThreads = 10;
  var source = db.ListOfUrls; // Thousands urls
  var iterations = source.Length / groupSize; 
  for (int i = 0; i < iterations; i++)
  {
    var subList = source.Skip(groupSize* i).Take(groupSize);
    Parallel.ForEach(subList, (item) => RecordUri(item)); 
    //I want to wait here until process further data to avoid overload
    while (activeThreads > 30) Thread.Sleep(100);
  }
}

private static async Task RecordUri(Uri uri)
{
   using (WebClient wc = new WebClient())
   {
      Interlocked.Increment(ref activeThreads);
      wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
      var jsonData = "";
      RootObject root;
      jsonData = await wc.DownloadStringTaskAsync(uri);
      var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
      RecordData(root)
    }
}

回答1:

If you want an elegant solution you should use Microsoft's Reactive Framework. It's dead simple:

var source = db.ListOfUrls; // Thousands urls

var query =
    from uri in source.ToObservable()
    from jsonData in Observable.Using(
        () => new WebClient(),
        wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
    select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };

IDisposable subscription =
    query.Subscribe(x =>
    {
        /* Do something with x.uri && x.json */
    });

That's the entire code. It's nicely multi-threaded and it's kept under control.

Just NuGet "System.Reactive" to get the bits.



回答2:

Parallel.ForEach

Will create ProcessorCount tasks to execute the function for each item in the source Enumerable. It will take care that there are not to many tasks and will wait for all items and tasks to be executed.

Task.WhenAll

Only awaits the given tasks it does not execute them. Its on your hand to execute them in a proper way and not to many at once.

But there is some fault in your code. The function RecordUri will return a task that has to be awaited otherwise the ForEach will just create more and more as the function will never know when the current task is completed. Also problematic is that you create a task in a task and the first task does nothing else then wait for the first one.

You might also want to take a look at this overload of Parallel.ForEach https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx

Edit

Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?

No. As when a task is awaiting a external resource it enters a Suspended state (Windows api that is not using some old/dirty iteration waiting). So there is no much difference. What differs is the overhead the compiler will generate when compiling your async code. The DownloadStringTaskAsync will create a task that contains the long operation. If you use await it, you will attach yourself to that task (by ContinueWith). So you just create a Task for awaiting another. This is the overhead i was talking about in the upper text.

My approach would be: Use the synchronous method inside your Parallel.ForEach. The Threadding will be done by PLinq and you are free to go on.

Remember "KISS"