I have a script which makes a lot of web requests (~300000). It looks something like this
// Setup a new wsClient
val config = new NingAsyncHttpClientConfigBuilder(DefaultWSClientConfig()).build
val builder = new AsyncHttpClientConfig.Builder(config)
val wsClient = new NingWSClient(builder.build)
// Each of these use the wsClient
def getAs: Future[Seq[A]] = { ... }
def getBs: Future[Seq[B]] = { ... }
def getCs: Future[Seq[C]] = { ... }
def getDs: Future[Seq[D]] = { ... }
(for {
as <- getAs
bs <- getBs
cs <- getCs
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
The problem is that I will run into a Too many open files
exception because each function is asynchronously making thousands of requests, each of which uses a file descriptor.
I tried re-organizing my functions so that each one would make batches with their own client:
def getAs: Future[Seq[A]] = {
someCollection.group(1000).map(batch => {
val client = new NingWSClient(builder.build) // Make a new client for every batch
Future.sequence(batch.map(thing => {
wsClient.url(...).map(...)
})).map(things => {
wsClient.close // Close the client
things
})
})
}
But this causes the for-comprehension to end early (without any error messages or exceptions):
(for {
as <- getAs
bs <- getBs // This doesn't happen
cs <- getCs // Or any of the following ones
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
I am just looking for the right way to make a large number of http requests without having to open too many file descriptors.
You can use Octoparts:
https://m3dev.github.io/octoparts/
but it really sounds like you want to reverse the pattern so the wsClient is making calls on the outside, and then you flatmap the Future[WSResponse] coming back out. That will throttle the number of futures to the internal Netty thread pool used by AsyncHttpClient, and you can change config settings to increase or decrease the number of threads in the netty channel pool.
I had a similar problem, too many requests for one web service(~500+). Your code example with grouping is almost correct, however, you will get
Iterator[Future[List[Int]]]
or if youFuture.sequence
-d itFuture[Iterator[List[Int]]]
. But, I think that they all will run asynchronously. You need to fire first batch and thenflatMap
it(wait until it's finished) and then fire next batch. This is what I've managed to write, following this answer:Hope this helps!