Akka Streams / HTTP: Get original request from res

2019-05-28 07:32发布

问题:

I have an Akka Streams source that goes through a flow and posts an HTTP request:

source.map(toRequest)
  .via(Http().outgoingConnection(host))
  .map(toMessage) 

Suppose that the toRequest method maps a string to an HttpRequest, and the toMessage method maps the HttpResponse to a message class that is needed to process downstream. Suppose that the message class needs to contain some of the original information.

Is it possible to get the original HttpRequest from the HttpResponse? If not, is there any way to keep some of the original information?

回答1:

One approach is to use a Future-based variant of the client API and a custom case class that holds the information you want to propagate downstream. For example:

case class Message(request: HttpRequest, response: HttpResponse)

source
  .map(toRequest)
  .mapAsync(parallelism = 3) { request => // adjust the level of parallelism as needed
    Http().singleRequest(request).map(response => Message(request, response))
  }
  // continue processing; at this point you have a Source[Message, _]


回答2:

You could use your graph api to by-pass your HttpRequest. One example would be:

object Main extends App {

  implicit val as = ActorSystem("Test")
  implicit val m = ActorMaterializer()
  implicit val ec = as.dispatcher

  def src1 = Source.fromIterator(() => List(1, 2, 3).toIterator)

  val srcGraph = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    // Create one flow to prepend the 'Number' string to the input integer
    def flow1 = Flow[Int].map { el => s"Number $el"  }

    // Create a broadcast stage 
    val broadcast = builder.add(Broadcast[Int](2))
    val zip       = builder.add(Zip[Int, String]())

    src1 ~> broadcast.in

    // The 0 port sends the int to the zip stage directly
    broadcast.out(0) ~>          zip.in0
    broadcast.out(1) ~> flow1 ~> zip.in1

    SourceShape(zip.out)
  })

  Source.fromGraph(srcGraph).runForeach(println(_))
}

The graph api provides many options to do things like this.