The idea is to keep the channel opened to use it later. In playframework 2.5.x the documentation says that you have to use akka streams but does not say anything how to achieve this example. Somebody can help me?
import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def socket = WebSocket.using[String] { request =>
// Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
val (out, channel) = Concurrent.broadcast[String]
// log the message to stdout and send response back to client
val in = Iteratee.foreach[String] {
msg => println(msg)
// the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
// receive the pushed messages
channel push("I received your message: " + msg)
}
(in,out)
}
You'll have to do something like this!
val (subscriber, publisher)=Source.asSubscriber[String]
.toMat(Sink.asPublisher[String](fanout = true))(Keep.both).run()
def websocketAction=WebSocket.accept { requestHeader =>
Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber),Source.fromPublisher(publisher))
}
The first part will create, given a sink and a flow, the objects that you'll need to push messages and receive them (subscribe to the publisher).
finally you'll create a flow for every websocket request you receive with that code Flow.fromSinkAndSource
... Something that's not clear regarding Akka Streams (Source
s, Sink
s and Flow
s) is that they represent the shape of the flow, but not the flow per se... the flow goes when you materialize them (with method runWith
or run
). Now... Play receives either Source
s (when using Server Sent Events) or Flow
s when using WebSockets. And they are not still materialized... so you need to materialize them (the first line) and then creating a Flow AGAIN! (the websocketAction line)
I'm sorry if I'm not clear enough, however use that code, it will work.
I finally found a solution using Actors. I found this:
def conect = WebSocket.accept[JsValue, JsValue] {request =>
ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}
Then I looked at the source code of ActorFlow.actorRef:
https://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play/api/libs/streams/ActorFlow.scala
and came up with this solution:
import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._
class UserActor(out: ActorRef) extends Actor {
def receive = {
// receives messages from client browser here
// out is actor that will send messages back to client(s)
case msg: String => out ! "Received message "+msg
}
}
object UserActor {
def props(out: ActorRef) = Props(new UserActor(out))
}
@Singleton
class NotificationController @Inject()(val config:Configuration)
(implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {
// outActor can be used to send messages to client(s)
// Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them). Use Sink.asPublisher(false) to create a unicast channel.
val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
.toMat(Sink.asPublisher(true))(Keep.both).run()
def flowsocket = WebSocket.accept[String, String] {request =>
val aflow:Flow[String, String, _] = {
val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )
val source = Source.fromPublisher(publisher)
Flow.fromSinkAndSource(
sink, source
)
}
aflow
}
}
I have since revised my solution to more fully embrace the Actor model. I now have a "UsersBroadcastActor" which is a singleton actor that all other "UserActor"s connect to and can communicate via it:
lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])
def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}
When UserActor is instantiated, in its preStart() method, it sends a subscription message to the broadcastActorRef, which saves the references to all the UserActors that "subscribe" to it. I can send send a message to broadcastActorRef, and it forwards it to each of the UserActors. Let me know if you'd like full code sample of this solution as well.
I think you are just looking for how to do a an Echo websocket connection with Play 2.5 and the Akka Streams flow.
This should do the trick
def socket = WebSocket.accept[String, String] { request =>
Flow[String]
.map(msg => "I received your message: " + msg)
}