Akka Stream + Akka Http - Get Request on Error

2019-03-31 12:56发布

问题:

I have the following stream that works pretty well:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

and a simple actor to handle the response status and the final message when the stream completes:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

I don't know if this is the best approach to count things and also to handle response status, but it's working so far.

The question is how to pass the original request to the actor in a way I could know what request caused a specific error.

My actor could expect the following instead:

case (request: String, response: HttpResponse) =>

But how to pass that information that I have at the beginning of my pipeline?

I was thinking of to map like this:

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

But I have no idea on how to fire the Http flow.

Any suggestion?

回答1:

With @cmbaxter help, I could solve my problem using the following piece of code:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)

source
  .map(url => HttpRequest(uri = url) -> url)
  .via(poolClientFlow)
  .to(Sink.actorRef(myActor, IsDone))
  .run()

Now my actor is able to receive this:

case (respTry: Try[HttpResponse], request: String) =>