Batching requests with play.api.libs.ws

2019-07-15 19:18发布

I have a script which makes a lot of web requests (~300000). It looks something like this

// Setup a new wsClient
val config = new NingAsyncHttpClientConfigBuilder(DefaultWSClientConfig()).build
val builder = new AsyncHttpClientConfig.Builder(config)
val wsClient = new NingWSClient(builder.build)

// Each of these use the wsClient
def getAs: Future[Seq[A]] = { ... }
def getBs: Future[Seq[B]] = { ... }
def getCs: Future[Seq[C]] = { ... }
def getDs: Future[Seq[D]] = { ... }

(for {
    as <- getAs
    bs <- getBs
    cs <- getCs
    ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))

The problem is that I will run into a Too many open files exception because each function is asynchronously making thousands of requests, each of which uses a file descriptor.

I tried re-organizing my functions so that each one would make batches with their own client:

def getAs: Future[Seq[A]] = {
    someCollection.group(1000).map(batch => {
        val client = new NingWSClient(builder.build) // Make a new client for every batch
        Future.sequence(batch.map(thing => {
            wsClient.url(...).map(...)
        })).map(things => {
            wsClient.close // Close the client
            things
        })
    })
}

But this causes the for-comprehension to end early (without any error messages or exceptions):

(for {
    as <- getAs
    bs <- getBs // This doesn't happen
    cs <- getCs // Or any of the following ones
    ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))

I am just looking for the right way to make a large number of http requests without having to open too many file descriptors.

2条回答
对你真心纯属浪费
2楼-- · 2019-07-15 19:34

You can use Octoparts:

https://m3dev.github.io/octoparts/

but it really sounds like you want to reverse the pattern so the wsClient is making calls on the outside, and then you flatmap the Future[WSResponse] coming back out. That will throttle the number of futures to the internal Netty thread pool used by AsyncHttpClient, and you can change config settings to increase or decrease the number of threads in the netty channel pool.

查看更多
等我变得足够好
3楼-- · 2019-07-15 19:35

I had a similar problem, too many requests for one web service(~500+). Your code example with grouping is almost correct, however, you will get Iterator[Future[List[Int]]] or if you Future.sequence-d it Future[Iterator[List[Int]]]. But, I think that they all will run asynchronously. You need to fire first batch and then flatMap it(wait until it's finished) and then fire next batch. This is what I've managed to write, following this answer:

val futureIterator = list.grouped(50).foldLeft(Future.successful[List[Int]](Nil)) {
  (fItems, items) =>
    fItems flatMap { processed =>
      println("PROCESSED: " + processed); println("SPAWNED: " + items);
      Future.traverse(items)(getFuture) map (res => processed ::: res)
    }
}
println(Await.result(futureIterator, Duration.Inf))

Hope this helps!

查看更多
登录 后发表回答