How to create akka-http client with backpressure o

2019-08-31 07:28发布

问题:

I have undefined amount of akka-http client flows downloading data from an http service. I'm using akka-http host-level connection pooling because I would like to customise the pool, since there are long running requests going through it.

Since, the number of clients is undefined and dynamic, I don't know how to configure the connection pool (max-open-requests/max-connections). Additionally, I might want the connection pool to be small (less than number of clients) to not damage the bandwidth.

Thus, I would like to set up a client flow so that new connections and requests to the pool are backpressured:

1.Does this mean I will need to have a single materialised client flow?

2.How I materialise as many client flows as I want, such that if there are no available connections (demand from downstream) requests will be back pressured.

My first attempt was Source.single pattern, however this method can exceed max-open-request and throw an exception as it creates a new flow instance each time a request is sent to a server.

My second attempt was Source.Queue, This method creates a single flow to which all requests are enqueued: however despite the documentaiton SourceQueue's OverflowStrategy backpressured does not work and when it exceeds max-connection or max-open-request, akka-http throws an exception

Can I accomplish backpressure using host-level streaming fashion and have one client flow and add new requests using with MergeHub?

This is my solution:

  private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] =
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host.split("http[s]?://").tail.head, port, connectionPoolSettings)

  val ServerSink =
    poolFlow.async.toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p)) => p.failure(e)
    }))(Keep.left)

  // Attach a MergeHub Source to the consumer. This will materialize to a
  // corresponding Sink.
  val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] =
  MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink)


  val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run()


  protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = {
    val responsePromise = Promise[HttpResponse]()
    Source.single((httpRequest -> responsePromise)).runWith(toConsumer)
    responsePromise.future.flatMap(handleHttpResponse(_, unmarshal))
    )
  }