I want to scrape a website with plenty of pages with interesting data but as the source is very large I want to multithread and limit the overload.
I use a Parallel.ForEach
to start each chunk of 10 tasks and I wait in the main for
loop until the numbers of active threads started drop below a threshold. For that I use a counter of active threads I increment when starting a new thread with a WebClient
and decrement when the DownloadStringCompleted
event of the WebClient
is triggered.
Originally the questions was how to use DownloadStringTaskAsync
instead of DownloadString
and wait that each of the threads started in the Parallel.ForEach
has completed. This has been solved with a workaround:
a counter (activeThreads
) and a Thread.Sleep
in the main foor loop.
Is using await DownloadStringTaskAsync
instead of DownloadString
supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?
And to get back to the original question, is there a way to do this more elegantly using TPL without the workaround of involving a counter ?
private static volatile int activeThreads = 0;
public static void RecordData()
{
var nbThreads = 10;
var source = db.ListOfUrls; // Thousands urls
var iterations = source.Length / groupSize;
for (int i = 0; i < iterations; i++)
{
var subList = source.Skip(groupSize* i).Take(groupSize);
Parallel.ForEach(subList, (item) => RecordUri(item));
//I want to wait here until process further data to avoid overload
while (activeThreads > 30) Thread.Sleep(100);
}
}
private static async Task RecordUri(Uri uri)
{
using (WebClient wc = new WebClient())
{
Interlocked.Increment(ref activeThreads);
wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
var jsonData = "";
RootObject root;
jsonData = await wc.DownloadStringTaskAsync(uri);
var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
RecordData(root)
}
}
If you want an elegant solution you should use Microsoft's Reactive Framework. It's dead simple:
var source = db.ListOfUrls; // Thousands urls
var query =
from uri in source.ToObservable()
from jsonData in Observable.Using(
() => new WebClient(),
wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };
IDisposable subscription =
query.Subscribe(x =>
{
/* Do something with x.uri && x.json */
});
That's the entire code. It's nicely multi-threaded and it's kept under control.
Just NuGet "System.Reactive" to get the bits.
Parallel.ForEach
Will create ProcessorCount tasks to execute the function for each item in the source Enumerable. It will take care that there are not to many tasks and will wait for all items and tasks to be executed.
Task.WhenAll
Only awaits the given tasks it does not execute them. Its on your hand to execute them in a proper way and not to many at once.
But there is some fault in your code. The function RecordUri
will return a task that has to be awaited otherwise the ForEach will just create more and more as the function will never know when the current task is completed. Also problematic is that you create a task in a task and the first task does nothing else then wait for the first one.
You might also want to take a look at this overload of Parallel.ForEach
https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx
Edit
Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?
No. As when a task is awaiting a external resource it enters a Suspended state (Windows api that is not using some old/dirty iteration waiting). So there is no much difference.
What differs is the overhead the compiler will generate when compiling your async code. The DownloadStringTaskAsync
will create a task that contains the long operation. If you use await it, you will attach yourself to that task (by ContinueWith). So you just create a Task for awaiting another. This is the overhead i was talking about in the upper text.
My approach would be: Use the synchronous method inside your Parallel.ForEach. The Threadding will be done by PLinq and you are free to go on.
Remember "KISS"