Selective request-throttling using akka-http strea

2019-07-25 15:31发布

enter image description here

I got one API which calls two another Downstream APIs. One downstream api (https://test/foo) is really important and it is very fast. Another slow downstream api (https://test/bar) has its limitation, the throughput of it only can handle 50 requests per sec.

I would like to make sure the downstream api https://test/foo has more priority than https://test/bar. For example, if the API thread pool is 75, I only allow 50 parallel incoming connection to go through https://test/bar. Rest of the connections should be used for https://test/bar. It would make https://test/bar never fails.

I guess I should apply throttle or maybe buffer with OverflowStrategy.dropNew for https://test/bar.

Here is the code snippet.

implicit val actorSystem = ActorSystem("api")
implicit val flowMaterializer = ActorMaterializer()

val httpService = Http()

val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
  httpService.bind(interface = "0.0.0.0", 3000)

val binding: Future[Http.ServerBinding] =
  serverSource
    .to(Sink.foreach { connection =>
      connection.handleWith(
        Flow[HttpRequest]
          .map {
            case HttpRequest(GET, Uri.Path("/priority-1"), _, _, _) =>
              HttpResponse(entity = scala.io.Source.fromURL("https://test/foo").mkString)
            case HttpRequest(GET, Uri.Path("/priority-2"), _, _, _) =>
              HttpResponse(entity = scala.io.Source.fromURL("https://test/bar").mkString)
          }
      )
    }).run()

Question 1: where should I put throttle(50, 1 seconds, 5000, ThrottleMode.Shaping) to conform to only https://test/bar threshold.

Question 2: do I need to apply buffer and OverflowStrategy.dropNew if I want to prioritise https://test/foo requests. In another words, all unnecessary connections for https://test/bar should be removed.

Question 3: Is there a better way to implement this requirement. I am using connection.handleWith[Flow[HttpRequest, HttpResponse]] in Sink and I am not sure this is right place.

If there are some code snippet provided, that would be much appreciated and super awesome :)

Thanks in advance

0条回答
登录 后发表回答