Consuming both Strict and Streamed WebSocket Messa

2019-07-23 06:56发布

问题:

I am experimenting with building a web socket service using Akka HTTP. I need to handle Strict messages that arrive in totality, as well as handle Streamed messages that arrive in m multiple frames. I am using a route with handleWebSocketMessages() to pass the handling of web sockets off to a flow. The code I have looks something like this:

val route: Route =
  get {
    handleWebSocketMessages(createFlow())
  }

def createFlow(): Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(msg) ⇒ msg
    case TextMessage.Streamed(stream) => ??? // <= What to do here??
  }
  .via(createActorFlow())
  .map {
    case msg: String ⇒ TextMessage.Strict(msg)
  }

def createActorFlow(): Flow[String, String, Any] = {
  // Set Up Actors
  // ... (this is working)
  Flow.fromSinkAndSource(in, out)
}

I am not really sure how two handle both Strict and Streamed messages. I realize I could do something like this :

  .collect {
    case TextMessage.Strict(msg) ⇒ Future.successful(msg)
    case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
  }

But now my stream has to handle Future[String] rather than just Strings, which I am then not sure how to handle, especially since obviously I need to handle messages in order.

I did see this akka issue, which seems to be somewhat related, but not exactly what I need (I don't think?).

https://github.com/akka/akka/issues/20096

Any help would be appriciated

回答1:

Folding sounds like a sensible option. Handling future in your streams can be done using (e.g.)

flowOfFutures.mapAsync(parallelism = 3)(identity)

Please note that mapAsync does preserve the order of the incoming messages, as per docs.

On a different note, other sensible precautions to handle streamed WS messages could be to use completionTimeout and limit to bound time and space for the message to fold (e.g.)

stream.limit(x).completionTimeout(5 seconds).runFold(...)


回答2:

The ultimate answer based on the below (thanks to svezfaz) answer turned out to be something like this:

val route: Route =
  get {
    handleWebSocketMessages(createFlow())
  }

def createFlow(): Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(msg) ⇒ 
      Future.successful(MyCaseClass(msg))
    case TextMessage.Streamed(stream) => stream
      .limit(100)                   // Max frames we are willing to wait for
      .completionTimeout(5 seconds) // Max time until last frame
      .runFold("")(_ + _)           // Merges the frames
      .flatMap(msg => Future.successful(MyCaseClass(msg)))
  }
  .mapAsync(parallelism = 3)(identity)
  .via(createActorFlow())
  .map {
    case msg: String ⇒ TextMessage.Strict(msg)
  }

def createActorFlow(): Flow[MyCaseClass, String, Any] = {
  // Set Up Actors as source and sink (not shown)
  Flow.fromSinkAndSource(in, out)
}