I have undefined amount of akka-http client flows downloading data from an http service. I'm using akka-http host-level connection pooling because I would like to customise the pool, since there are long running requests going through it.
Since, the number of clients is undefined and dynamic, I don't know how to configure the connection pool (max-open-requests/max-connections). Additionally, I might want the connection pool to be small (less than number of clients) to not damage the bandwidth.
Thus, I would like to set up a client flow so that new connections and requests to the pool are backpressured:
1.Does this mean I will need to have a single materialised client flow?
2.How I materialise as many client flows as I want, such that if there are no available connections (demand from downstream) requests will be back pressured.
My first attempt was Source.single pattern, however this method can exceed max-open-request and throw an exception as it creates a new flow instance each time a request is sent to a server.
My second attempt was Source.Queue, This method creates a single flow to which all requests are enqueued: however despite the documentaiton SourceQueue's OverflowStrategy backpressured does not work and when it exceeds max-connection or max-open-request, akka-http throws an exception
Can I accomplish backpressure using host-level streaming fashion and have one client flow and add new requests using with MergeHub?
This is my solution:
private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] =
Http().cachedHostConnectionPool[Promise[HttpResponse]](host.split("http[s]?://").tail.head, port, connectionPoolSettings)
val ServerSink =
poolFlow.async.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] =
MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink)
val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run()
protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = {
val responsePromise = Promise[HttpResponse]()
Source.single((httpRequest -> responsePromise)).runWith(toConsumer)
responsePromise.future.flatMap(handleHttpResponse(_, unmarshal))
)
}