Scala, Play, Akka, Websocket: how to pass actor me

2020-05-16 07:05发布

问题:

I have an actor which is launched with application, running in the background watching for certain changes and if there are any reporting them. At the moment it just a println to the console. What I need to do is whenever there is a new message - send it to the front end using Websocket.

This is my Play Global object where the monitoring/listening actor is launched:

object Global extends GlobalSettings {

    override def onStart(app: Application) {

        class Listener extends Actor {
            //This needs to be changed to pass messages to Websocket, how?
            def receive = {
                case Create(path) => println("CREATE " + path)
                case Delete(path) => println("DELETE " + path)
                case Modify(path) => println("MODIFY " + path)
            }
        }

        val listener = Akka.system.actorOf(Props[Listener], "listener")
        val swatch = Akka.system.actorOf(Props[SwatchActor], "swatch")
        swatch ! Watch("/folder/path", Seq(Create, Modify, Delete), true, Some(listener))

    }

}

This is my Play controller:

object Application extends Controller {

    def test = WebSocket.using[String] { request =>

        //This hopefully gets the listener actor reference?
        val listener = Akka.system.actorSelection("/user/listener")

        val (out, channel) = Concurrent.broadcast[String]
        val in = Iteratee.foreach[String] { msg =>
            //Actor messages must be pushed here, how?
            channel push("RESPONSE: " + msg)
        }

        (in, out)

    }   

}

I understand that in order for websocket connection to be established there has to be an initial "in".

So my problems are:

  1. How do I modify the Listener actor to push messages to Websocket?
  2. What do I need to do to prepare the actor to push messages once the websocket connection is established?
  3. How do I push messages from the listener actor to the websocket?

回答1:

I have found the solution.

Case class that has to be imported from a separate file:

case class Start(out: Concurrent.Channel[String])

Global object:

object Global extends GlobalSettings {

    override def onStart(app: Application) {

        class Listener extends Actor {
            var out = {
                val (enum, chan) = Concurrent.broadcast[String]
                chan
            }
            def receive = {
                //Websocket channel out is set here
                case Start(out) => this.out = out
                //Pushing messages to Websocket
                case Create(path) => this.out.push(path.toString)
                case Delete(path) => this.out.push(path.toString)
                case Modify(path) => this.out.push(path.toString)
            }
        }

        val listener = Akka.system.actorOf(Props[Listener], "listener")
        val swatch = Akka.system.actorOf(Props[SwatchActor], "swatch")
        swatch ! Watch("/folder/path", Seq(Create, Modify, Delete), true, Option(listener))

    }

}

Play controller:

object Application extends Controller {

    def test = WebSocket.using[String] { request =>

        val (out, channel) = Concurrent.broadcast[String]

        val listener = Akka.system.actorSelection("akka://application/user/listener")
        //This is where the websocket out channel is being passed to the listener actor
        listener ! Start(channel)

        val in = Iteratee.foreach[String] { msg =>
            channel push("RESPONSE: " + msg)
        }

        (in, out)

    }   

}


回答2:

Just the other day I wrote this, which will be included in Play 2.3. You can copy it to your own application for now:

https://github.com/jroper/ReactiveMaps/blob/websocket-actor/app/actors/WebSocketActor.scala

You pass it a Props object, and it will create the actor for you. You send to the WebSocket by sending to context.parent.