I'm using akka streams and I have a segment of my graph that I need to conditionally skip because the flow can't handle certain values. Specifically, I have a flow that takes a string and makes http requests, but the server can't handle the case when the string is empty. But I need to just return an empty string instead. Is there a way of doing this without having to go through the http request knowing it will fail? I basically have this:
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val flow = source.via(httpRequest).via(httpResponse)
The only thing I can think of doing is catching the 400 error in my httpResponse flow and returning a default value. But I'd like to be able to avoid the overhead of hitting the server for a request I know is going to fail beforehand.
Viktor Klang's solution is concise and elegant. I just wanted to demonstrate an alternative using Graphs.
You can split your source of Strings into two streams and filter one stream for valid Strings and the other stream for invalid Strings. Then merge the results ("cross the streams").
Based on the documentation:
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val source = Source(List("1", "2", "", "3", "4"))
val sink : Sink[String,_] = ???
val bcast = builder.add(Broadcast[String](2))
val merge = builder.add(Merge[String](2))
val validReq = Flow[String].filter(_.size > 0)
val invalidReq = Flow[String].filter(_.size == 0)
val httpRequest: Flow[String, HttpRequest, _] = ???
val makeHttpCall: Flow[HttpRequest, HttpResponse, _] = ???
val httpResponse: Flow[HttpResponse, String, _] = ???
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
source ~> bcast ~> validReq ~> someHttpTransformation ~> merge ~> sink
bcast ~> invalidReq ~> merge
ClosedShape
})
Note: this solution splits the stream, therefore the Sink may process String value results in a different order than is expected based on the inputs.
You could use flatMapConcat
:
(Warning: was never compiled, but you'll get the gist of it)
val source = Source("1", "2", "", "3", "4")
val httpRequest: Flow[String, HttpRequest, _]
val httpResponse: Flow[HttpResponse, String, _]
val makeHttpCall: Flow[HttpRequest, HttpResponse, _]
val someHttpTransformation = httpRequest via makeHttpCall via httpResponse
val emptyStringSource = Source.single("")
val cleanerSource = source.flatMapConcat({
case "" => emptyStringSource
case other => Source.single(other) via someHttpTransformation
})