Akka Flow hangs when making http requests via conn

2020-06-16 08:55发布

问题:

I'm using Akka 2.4.4 and trying to move from Apache HttpAsyncClient (unsuccessfully).

Below is simplified version of code that I use in my project.

The problem is that it hangs if I send more than 1-3 requests to the flow. So far after 6 hours of debugging I couldn't even locate the problem. I don't see exceptions, error logs, events in Decider. NOTHING :)

I tried reducing connection-timeout setting to 1s thinking that maybe it's waiting for response from the server but it didn't help.

What am I doing wrong ?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

Also what's up with proxy support ? Doesn't seem to work for me either.

回答1:

You need to consume the body of the response fully so that the connection is made available for subsequent requests. If you don't care about the response entity at all, then you can just drain it to a Sink.ignore, something like this:

resp.entity.dataBytes.runWith(Sink.ignore)

By the default config, when using a host connection pool, the max connections is set to 4. Each pool has it's own queue where requests wait until one of the open connections becomes available. If that queue ever goes over 32 (default config, can be changed, must be a power of 2) then yo will start seeing failures. In your case, you only do 10 requests, so you don't hit that limit. But by not consuming the response entity you don't free up the connection and everything else just queues in behind, waiting for the connections to free up.