I have an Akka Streams source that goes through a flow and posts an HTTP request:
source.map(toRequest)
.via(Http().outgoingConnection(host))
.map(toMessage)
Suppose that the toRequest
method maps a string to an HttpRequest
, and the toMessage
method maps the HttpResponse
to a message class that is needed to process downstream. Suppose that the message class needs to contain some of the original information.
Is it possible to get the original HttpRequest
from the HttpResponse
? If not, is there any way to keep some of the original information?
One approach is to use a Future
-based variant of the client API and a custom case class that holds the information you want to propagate downstream. For example:
case class Message(request: HttpRequest, response: HttpResponse)
source
.map(toRequest)
.mapAsync(parallelism = 3) { request => // adjust the level of parallelism as needed
Http().singleRequest(request).map(response => Message(request, response))
}
// continue processing; at this point you have a Source[Message, _]
You could use your graph api to by-pass your HttpRequest. One example would be:
object Main extends App {
implicit val as = ActorSystem("Test")
implicit val m = ActorMaterializer()
implicit val ec = as.dispatcher
def src1 = Source.fromIterator(() => List(1, 2, 3).toIterator)
val srcGraph = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
// Create one flow to prepend the 'Number' string to the input integer
def flow1 = Flow[Int].map { el => s"Number $el" }
// Create a broadcast stage
val broadcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, String]())
src1 ~> broadcast.in
// The 0 port sends the int to the zip stage directly
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> flow1 ~> zip.in1
SourceShape(zip.out)
})
Source.fromGraph(srcGraph).runForeach(println(_))
}
The graph api provides many options to do things like this.