How to throttle concurrent Async webrequests

2019-07-11 03:53发布

问题:

I often need to make a large number of webrequests, without overloading the network

I currently do this by running synchronous requests in parallel, utilizing ThreadPool.SetMinThreads and MaxDegreeOfParallelism to exactly specify how many requests run concurrently

Now this works just fine, but it feels wrong.

I would really like to utilize async methods, but i cant work out how to limit the number of concurrent requests.

A simplified example of my parallel way of doing this( using a webclient and no error handling for brevity):

Private Function SearchSitesForKeywordInParallel(ByVal keyword As String, ByVal sites As String(), ByVal maxConcurrency As Integer) As String()
    Dim po As New ParallelOptions
    po.MaxDegreeOfParallelism = maxConcurrency
    Threading.ThreadPool.SetMinThreads(maxConcurrency, 2)
    Dim sitesContainingKeyword As New Concurrent.ConcurrentBag(Of String)

    Parallel.For(0, sites.Count, po, Sub(i)
                                         Dim wc As New Net.WebClient
                                         wc.Proxy = Nothing
                                         Dim pageSource As String = wc.DownloadString(sites(i))
                                         If pageSource.Contains(keyword) Then
                                             sitesContainingKeyword.Add(sites(i))
                                         End If
                                     End Sub)
    Return sitesContainingKeyword.ToArray
End Function

This is a blocking function, which is what i require. Now i have tested the webclient.downloadStringAsync method in a regular for loop, and it will fire all the requests pretty much at once, overloading the network.

What i would like to do is initially make X requests, then make new ones as each response comes back.

I am fairly sure tasks is the way to go, and im positive a have read some very nice implementations in c#, but my c# experience is limited, and i have a hard time translating c# lambadas to vb.net.

I am also limited to vs2010 and .net4, so the niceties of .net4.5 async await are not an option for me.

Any help very much appreciated

回答1:

You can do this asynchronously in VB.NET using the Wintellect Powerthreading library's AsyncEnumerator class, which you can get from NuGet.

This gives you some of the functionality of Await but works in VS2010 with .Net 2.0 to 4.0 while giving you an upgrade path to the 4.5 async features.

The downside is that the WebClient async methods require an EAP-to-APM shim based on Task<> to be used with AsyncEnumerator, so the code is quite a lot more complicated.

The simplest way to control the number of concurrent requests is to initiate X async operations, then just initiate another every time one completes.

Example code:

Imports System.Collections.Generic
Imports System.Runtime.CompilerServices
Imports System.Threading.Tasks
Imports System.Net
Imports Wintellect.Threading.AsyncProgModel

Module TaskExtension
    REM http://msdn.microsoft.com/en-us/library/hh873178.aspx
    <Extension()>
    Public Function AsApm(Of T1)(ByVal task As Task(Of T1), callback As AsyncCallback, state As Object) As IAsyncResult
        If (task Is Nothing) Then
            Throw New ArgumentNullException("task")
        End If
        Dim tcs = New TaskCompletionSource(Of T1)(state)
        task.ContinueWith(Sub(t As Task(Of T1))
                              If (t.IsFaulted) Then
                                  tcs.TrySetException(t.Exception.InnerExceptions)
                              ElseIf t.IsCanceled Then
                                  tcs.TrySetCanceled()
                              Else : tcs.TrySetResult(t.Result)
                              End If
                              If (Not callback Is Nothing) Then
                                  callback(tcs.Task)
                              End If
                          End Sub, TaskScheduler.Default)
        Return tcs.Task
    End Function
End Module

Module ApmAsyncDownload
    Public Function DownloadStringAsync(url As Uri) As Task(Of String)
        Dim tcs As New TaskCompletionSource(Of String)
        Dim wc As New WebClient()
        AddHandler wc.DownloadStringCompleted, Sub(s As Object, e As System.Net.DownloadStringCompletedEventArgs)
                                                   If (Not (e.Error Is Nothing)) Then
                                                       tcs.TrySetException(e.Error)
                                                   ElseIf e.Cancelled Then
                                                       tcs.TrySetCanceled()
                                                   Else : tcs.TrySetResult(e.Result)
                                                   End If
                                               End Sub
        wc.DownloadStringAsync(url)
        Return tcs.Task
    End Function
    Public Function BeginDownloadString(url As Uri, callback As AsyncCallback, state As Object) As IAsyncResult
        Return DownloadStringAsync(url).AsApm(callback, state)
    End Function
    Public Function EndDownloadString(asyncResult As IAsyncResult) As String
        Dim castToTask As Task(Of String) = asyncResult
        Return castToTask.Result
    End Function
End Module

Public Class AsyncIterators
    Private Shared Iterator Function SearchUrl(ae As AsyncEnumerator(Of Boolean), keyword As String, uri As Uri) As IEnumerator(Of Int32)
        ae.Result = False
        ApmAsyncDownload.BeginDownloadString(uri, ae.End(0, AddressOf ApmAsyncDownload.EndDownloadString), Nothing)
        Yield 1
        If (ae.IsCanceled()) Then
            Return
        End If
        Try
            Dim page As String = ApmAsyncDownload.EndDownloadString(ae.DequeueAsyncResult)
            ae.Result = page.Contains(keyword)
        Catch ex As AggregateException
        End Try
    End Function
    Public Shared Iterator Function SearchIterator(ae As AsyncEnumerator(Of List(Of String)), keyword As String, urls As List(Of Uri)) As IEnumerator(Of Int32)
        ae.Result = New List(Of String)
        'Control how many searches are started asynchonously
        Dim startSearches = Math.Min(3, urls.Count)
        Dim enumerator = urls.GetEnumerator
        Dim toBeCompleted = urls.Count
        Do Until (toBeCompleted <= 0)
            While (startSearches > 0)
                If enumerator.MoveNext Then
                    Dim subAe = New AsyncEnumerator(Of Boolean)()
                    subAe.SyncContext = Nothing
                    subAe.BeginExecute(SearchUrl(subAe, keyword, enumerator.Current), ae.End(0, Function(ar As IAsyncResult) As AsyncEnumerator.EndObjectXxx
                                                                                                    subAe.EndExecute(ar)
                                                                                                End Function), enumerator.Current)
                End If
                startSearches = startSearches - 1
            End While
            'Wait for first async search to complete
            Yield 1
            toBeCompleted = toBeCompleted - 1
            If (ae.IsCanceled()) Then
                Exit Do
            End If
            'Get result of the search and add to results
            Dim result = ae.DequeueAsyncResult()
            Dim completedAe = AsyncEnumerator(Of Boolean).FromAsyncResult(result)
            If (completedAe.EndExecute(result)) Then
                Dim uri As Uri = result.AsyncState
                ae.Result.Add(uri.OriginalString)
            End If
            'Start 1 more search
            startSearches = startSearches + 1
        Loop
    End Function
End Class

Module Module1
    Sub Main()
        Dim searchAe = New AsyncEnumerator(Of List(Of String))()
        searchAe.SyncContext = Nothing
        Dim urlStrings = New List(Of String) From {"http://www.google.com", "http://www.yahoo.com", "http://www.dogpile.com"}
        Dim uris = urlStrings.Select(Function(urlString As String) As Uri
                                         Return New Uri(urlString)
                                     End Function).ToList()
        For Each Str As String In searchAe.EndExecute(searchAe.BeginExecute(AsyncIterators.SearchIterator(searchAe, "search", uris), Nothing, Nothing))
            Console.WriteLine(Str)
        Next
        Console.ReadKey()
    End Sub
End Module

And I now see what you mean about translating c# lambdas!



回答2:

Not sure, if I understand completey, what exactly you want to achieve, but if you want to use aync methods, you can do it like this:

    Dim google As String = "http://www.google.com/#&q="

    Dim qsites As New Concurrent.ConcurrentQueue(Of String)
    For Each k In {"foo", "bar", "john", "jack", "stackoverflow", "basic", "ship", "car", "42"}
        qsites.Enqueue(google & k)
    Next

    Dim cde As New System.Threading.CountdownEvent(qsites.Count)

    Dim strings As New Concurrent.ConcurrentBag(Of String)
    Dim completedhandler = Sub(wco As Object, ev As Net.DownloadStringCompletedEventArgs)
                               Dim wc = DirectCast(wco, Net.WebClient)
                               Debug.Print("got one!")
                               strings.Add(ev.Result)
                               cde.Signal()
                               Dim s As String = String.Empty
                               If qsites.TryDequeue(s) Then
                                   Debug.Print("downloading from {0}", s)
                                   wc.DownloadStringAsync(New Uri(s))
                               End If
                           End Sub

    Dim numthreads As Integer = 4

    System.Threading.Tasks.Task.Factory.StartNew(Sub()
                                                     For i = 1 To numthreads
                                                         Dim s As String = String.Empty
                                                         If qsites.TryDequeue(s) Then
                                                             Dim wc As New Net.WebClient
                                                             wc.Proxy = Nothing
                                                             AddHandler wc.DownloadStringCompleted, completedhandler
                                                             Debug.Print("downloading from {0}", s)
                                                             wc.DownloadStringAsync(New Uri(s))
                                                         End If
                                                     Next
                                                 End Sub)

    cde.Wait()

You only need to "start" the async downloads in a different thread/task because (afaik) the WC's downloadcompleted events fire in the UI thread (or currentsync..context) and the cde.wait would then not allow the events to be handled.



回答3:

I know this is an year old, but just want to add another answer to it as I have recently solved similar problem (for details: Need help in deciding when is it good idea to limit the 'number of thread pool threads .net app consumes'? (note the code snippet is in c#, but should give the idea)

I used to have number of parallel http synchrnous requests sent to http server on different thread and used to limit the number of requests I sent using semaphore.

Now, I have adapted to new TPL (c# 5.0 - aysn/await - quite handy (basically continuation introduced in TPL sound natural to me - and with async/await it has become much easier to use)), to invoke network I/O asynchronously.

i.e. ideally now I will be using only one thread in caller (unless I really need to get results before continuing), and let .net, os and I/o completion port threads work together to invoke my continuation code in thread pool to complete operation (basically 'callback' in APM, on completed event in event based pattern, 'continuation' in TPL, code after await in C# 5.0 (4.5 .net))

the principle I followed when I have embraced async i/o is simple - don't let thread wait and waste CPU and resources, unless it is really really necessary!!

Regards.