Broadcasting messages in Play Framework WebSockets

2020-03-07 04:30发布

问题:

I'm pushing messages in Play Framework WebSockets using Concurrent.unicast[JsValue], and I want to optimize sending the same message to multiple users. Is it possible to broadcast message using somehow multiple Concurrent.Channel?

回答1:

Short answer

Maintain separate channel for each user and have groups associated with users

Long answer

package controllers

import akka.actor.Actor
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.Concurrent.Channel
import play.api.libs.iteratee.Concurrent
import play.api.Logger
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext

object AdvancedRoomMessages {
  case class Join(name: String)
  case class BroadcastGroup(msg: String, gName: String)
  case class BroadcastAll(msg: String)
  case class AddGroup(gName: String)
  case class RemoveGroup(gName: String)
  case class AddUserToGroup(userName: String, gName: String)
  case class removeUserFromGroup(userName: String, gName: String)
}


class AdvancedRoom extends Actor {
  import scala.collection.mutable._


  /**
   * common channel for communication
   */


  val (enumerator, channel) = Concurrent.broadcast[String]



  /**
   * every user has his own channel
   */
  val users = Map[String, (Enumerator[String],Channel[String])]()

  /**
   * users can be grouped
   */
  val groups = Map[String, Option[Set[String]]]()

  import AdvancedRoomMessages._

  def receive = {
    case Join(name) => {
      /**
       * join request from the user
       */
      if(users contains name) {
        /**
         * existing user
         */
        val iteratee = Iteratee.ignore[String]
        sender ! ((iteratee, users(name)._1))
      }else {
        /**
         * join request from a new user
         */

        /**
         * create new broadcast channel
         */
        val (enumerator, channel) = Concurrent.broadcast[String]
        users += ((name, (enumerator, channel)))
        val iteratee = Iteratee.foreach[String](msg => {
          //do something with the message
        }).map{ _ => {
          /**
           * user closed his websocket client, so remove the user
           * warning ... also remove the corresponding user name in groups
           */
          users(name)._2.eofAndEnd()
          users -= name
        }}
        sender ! (iteratee, enumerator)
      }
    }
    case BroadcastGroup(msg, gName) => {
      groups(gName) match {
        case Some(gMates) => {
          gMates.foreach { person => users(person)._2.push(msg)}
        }
        case None => Logger.info("empty group") //ignore sending message
      }
    }
    case BroadcastAll(msg) => {
      channel push msg
    }
    case AddGroup(gName: String) => {
      groups += ((gName, None))
    }
    case RemoveGroup(gName: String) => {
      groups -= gName
    }
    case AddUserToGroup(userName, gName) => {
      groups(gName) match {
        case Some(gMates) => gMates += userName
        case None => Set(userName)
      }
    }
  }
}


回答2:

def filter(group_id: String) = Enumeratee.filter[JsValue]{  json: JsValue =>

  group_id == (json\"group_id").as[String]
}

This filter has to be applied as,

def chat(group_id: String) = WebSocket.using[JsValue] { request =>

    val in = Iteratee.foreach[JsValue]{ msg=>

    public_channel.push(msg)

    }

    (in, public_enumerator &> filter(group_id))
}


回答3:

From my experiments, Concurrent.broadcast does not send to everyone (some unfortunate naming perhaps?) Here is what I used that works as expected.

package controllers

import play.api._
import play.api.mvc._
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.collection.mutable.{Set => MS}
import scala.concurrent._ 

object Application extends Controller {
  val c:MS[(Int, Concurrent.Channel[String])] = MS() // (channelID, Channel))

  def pushHello = c.foreach(_._2.push("hello")) // push to ALL channels

  def index = WebSocket.async[String] { _ => future{
        val (out,channel) = Concurrent.broadcast[String]          
        val channelID = scala.util.Random.nextInt
        c.add((channelID, channel))
        val in = Iteratee.foreach[String] {
          _ match {
            case any => channel.push("received:"+any) // push to current channel
          }
        }.map { _ => c.retain(x => x._1 != channelID) }
        (in, out)
    }
  }
}


回答4:

Short Answer


val (enumerator, channel) = Concurrent.broadcast[String]

use above thing globally

Long answer


package controllers

import play.api._
import play.api.mvc._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.Iteratee
import play.api.libs.iteratee.Enumerator
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout

import Room._

import scala.concurrent.duration._

object Application extends Controller {

  def index = Action {
    Ok(views.html.index("Your new application is ready."))
  }

  /**
   * get actor ref
   */
  val room = Akka.system.actorOf(Props[Room])

  /**
   * websocket action
   */
  def chat(name: String) = WebSocket.async[String](implicit request => {
    implicit val timeout = Timeout(1 seconds)

    (room ? Join(name)).mapTo[(Iteratee[String, _], Enumerator[String])]
  })
}



//Here is the actor
package controllers

import akka.actor.Actor
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext

object Room {
  case class Join(name: String)
  case class Broadcast(msg: String)
  case object Quit
}

class Room extends Actor {

  /**
   * here is the meat 
   * Creating channel globally is important here
   * This can be accessed across all cases in receive method
   * pushing the message into this channel and returning this enumerator to all ,
   * broadcasts the message 
   */
  val (enumerator, channel) = Concurrent.broadcast[String]

  /**
   * keep track of users
   */
  val users = scala.collection.mutable.Set[String]()

  import Room._

  def receive = {
    case Join(name) => {
      /**
       * add new users
       */
      if(!users.contains(name)) {
        users += name

        val iteratee = Iteratee.foreach[String]{
          msg => {
            /**
             * process messages from users
             * here we are broadcast it to all other users
             */
            self ! Broadcast(msg)
          }
        }.map( _ => {
          /**
           * user closed his websocket. 
           * remove him from users
           */
          users -= name
        })

        /**
         * send iteratee, enumerator pair to the sender of join message
         */
        sender ! (iteratee, enumerator)
      } else {

        /**
         * already added users
         */
        val iteratee = Iteratee.ignore[String]

        /**
         * send iteratee and enumerator pair
         */
        sender ! (iteratee, enumerator)
      }
    }

    case Broadcast(msg) => channel push(msg)

    /**
     * close the common channel only when actor is stopped
     */
    case Quit => channel eofAndEnd(); context.stop(self)
  }
}