I am experimenting with building a web socket service using Akka HTTP. I need to handle Strict messages that arrive in totality, as well as handle Streamed messages that arrive in m multiple frames. I am using a route with handleWebSocketMessages() to pass the handling of web sockets off to a flow. The code I have looks something like this:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒ msg
case TextMessage.Streamed(stream) => ??? // <= What to do here??
}
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[String, String, Any] = {
// Set Up Actors
// ... (this is working)
Flow.fromSinkAndSource(in, out)
}
I am not really sure how two handle both Strict and Streamed messages. I realize I could do something like this :
.collect {
case TextMessage.Strict(msg) ⇒ Future.successful(msg)
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
}
But now my stream has to handle Future[String] rather than just Strings, which I am then not sure how to handle, especially since obviously I need to handle messages in order.
I did see this akka issue, which seems to be somewhat related, but not exactly what I need (I don't think?).
https://github.com/akka/akka/issues/20096
Any help would be appriciated
Folding sounds like a sensible option. Handling future in your streams can be done using (e.g.)
Please note that mapAsync does preserve the order of the incoming messages, as per docs.
On a different note, other sensible precautions to handle streamed WS messages could be to use completionTimeout and limit to bound time and space for the message to fold (e.g.)
The ultimate answer based on the below (thanks to svezfaz) answer turned out to be something like this: