Close akka-http websocket connection from server

2020-03-01 22:59发布

问题:

In my scenario, a client sends "goodbye" websocket message and I need to close previously established connection at the server side.

From akka-http docs:

Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server's socket by cancelling the IncomingConnection source connections.

But it's not clear to me how to do that taking into account that Sink and Source are set once when negotiating a new connection:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ⇒
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ⇒
      reject(ExpectedWebsocketRequestRejection)
  }
}

回答1:

HINT: This answer is based on akka-stream-experimental version 2.0-M2. The API may be slightly different in other versions.


An easy way to close the connection is by using a PushStage:

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ⇒
      // println("Connection closed")
      ctx.finish()
    case msg ⇒
      ctx.push(msg)
  }
}

Every element that is received at the client side or at the server side (and in general every element that goes through a Flow) goes through such a Stage component. In Akka, the full abstraction is called GraphStage, more information can be found in the official documentation.

With a PushStage we can watch concrete incoming elements for their value and than transform the context accordingly. In the example above, once the goodbye message is received we finish the context otherwise we just forward the value through the push method.

Now, we can connect the closeClient component to an arbitrary flow through the transform method:

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ⇒ closeClient)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_ + "\n")
  .map(ByteString(_))

connection.join(flow).run()

The flow above receives a ByteString and returns a ByteString, which means it can be connected to connection through the join method. Inside of the flow we first convert the bytes to a string before we send them to closeClient. If the PushStage doesn't finish the stream, the element is forwarded in the stream, where it gets dropped and replaced by some input from stdin, which is then sent back over the wire. In case the stream is finished, all further stream processing steps after the stage component will be dropped - the stream is now closed.



回答2:

This can be accomplished by the following in the current (2.4.14) version of akka-stream

package com.trackabus.misc

import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

// terminates the flow based on a predicate for a message of type T
// if forwardTerminatingMessage is set the message is passed along the flow
// before termination
// if terminate is true the stage is failed, if it is false the stage is completed
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true)
  extends GraphStage[FlowShape[T, T]]
{
  val in = Inlet[T]("TerminateFlowStage.in")
  val out = Outlet[T]("TerminateFlowStage.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) {

      setHandlers(in, out, new InHandler with OutHandler {
        override def onPull(): Unit = { pull(in) }

        override def onPush(): Unit = {
          val chunk = grab(in)

          if (pred(chunk)) {
            if (forwardTerminatingMessage)
              push(out, chunk)
            if (terminate)
              failStage(new RuntimeException("Flow terminated by TerminateFlowStage"))
            else
              completeStage()
          }
          else
            push(out, chunk)
        }
      })
  }
}

To use it define your stage

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe])

and then include it as part of the flow

.via(termOnKillMe)


回答3:

Another way is to manage the connection using the queue from Source.queue. The queue can be used to send messages to the client as well as closing the connection.

def socketFlow: Flow[Message, Message, NotUsed] = {
  val (queue, source) = Source.queue[Message](5, OverflowStrategy.fail).preMaterialize()

  // receive client message 
  val sink = Sink.foreach[Message] {
    case TextMessage.Strict("goodbye") =>
      queue.complete() // this closes the connection
    case TextMessage.Strict(text) =>
      // send message to client by using offer
      queue.offer(TextMessage(s"you sent $text")) 
  }
  Flow.fromSinkAndSource(sink, source)
}

// you then produce the upgrade response like this
val response = upgrade.handleMessages(socketFlow)

A benefit with using the queue for WebSockets is that you can use it to send messages whenever you want as long as you have access to it instead of having to wait for an incoming message to reply to.