How to refactor this code by using akka streams.

2019-03-27 09:18发布

问题:

The idea is to keep the channel opened to use it later. In playframework 2.5.x the documentation says that you have to use akka streams but does not say anything how to achieve this example. Somebody can help me?

import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext

def socket =  WebSocket.using[String] { request =>

  // Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
  val (out, channel) = Concurrent.broadcast[String]

  // log the message to stdout and send response back to client
  val in = Iteratee.foreach[String] {
    msg => println(msg)
      // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
      // receive the pushed messages
      channel push("I received your message: " + msg)
  }
  (in,out)
}

回答1:

You'll have to do something like this!

val (subscriber, publisher)=Source.asSubscriber[String]
      .toMat(Sink.asPublisher[String](fanout = true))(Keep.both).run()

def websocketAction=WebSocket.accept { requestHeader =>
    Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber),Source.fromPublisher(publisher))
}

The first part will create, given a sink and a flow, the objects that you'll need to push messages and receive them (subscribe to the publisher).

finally you'll create a flow for every websocket request you receive with that code Flow.fromSinkAndSource... Something that's not clear regarding Akka Streams (Sources, Sinks and Flows) is that they represent the shape of the flow, but not the flow per se... the flow goes when you materialize them (with method runWith or run). Now... Play receives either Sources (when using Server Sent Events) or Flows when using WebSockets. And they are not still materialized... so you need to materialize them (the first line) and then creating a Flow AGAIN! (the websocketAction line)

I'm sorry if I'm not clear enough, however use that code, it will work.



回答2:

I finally found a solution using Actors. I found this:

def conect = WebSocket.accept[JsValue, JsValue] {request => 
  ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}

Then I looked at the source code of ActorFlow.actorRef: https://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play/api/libs/streams/ActorFlow.scala

and came up with this solution:

import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._

import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._

class UserActor(out: ActorRef) extends Actor {
  def receive = {
    // receives messages from client browser here
    // out is actor that will send messages back to client(s)
    case msg: String => out ! "Received message "+msg
  }
}
object UserActor {
  def props(out: ActorRef) = Props(new UserActor(out))
}

@Singleton
class NotificationController @Inject()(val config:Configuration)
                          (implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {

  // outActor can be used to send messages to client(s)
  // Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them).  Use Sink.asPublisher(false) to create a unicast channel.
  val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
        .toMat(Sink.asPublisher(true))(Keep.both).run()


  def flowsocket = WebSocket.accept[String, String] {request =>
    val aflow:Flow[String, String, _] = {

        val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )

        val source = Source.fromPublisher(publisher)

        Flow.fromSinkAndSource(
            sink, source
        )
    }
    aflow
  }

}

I have since revised my solution to more fully embrace the Actor model. I now have a "UsersBroadcastActor" which is a singleton actor that all other "UserActor"s connect to and can communicate via it:

lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])

def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
    ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}

When UserActor is instantiated, in its preStart() method, it sends a subscription message to the broadcastActorRef, which saves the references to all the UserActors that "subscribe" to it. I can send send a message to broadcastActorRef, and it forwards it to each of the UserActors. Let me know if you'd like full code sample of this solution as well.



回答3:

I think you are just looking for how to do a an Echo websocket connection with Play 2.5 and the Akka Streams flow.

This should do the trick

  def socket = WebSocket.accept[String, String] { request =>
    Flow[String]
      .map(msg => "I received your message: " + msg)
  }