I'm pushing messages in Play Framework WebSockets using Concurrent.unicast[JsValue]
, and I want to optimize sending the same message to multiple users. Is it possible to broadcast message using somehow multiple Concurrent.Channel
?
可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
回答1:
Short answer
Maintain separate channel for each user and have groups associated with users
Long answer
package controllers
import akka.actor.Actor
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.Concurrent.Channel
import play.api.libs.iteratee.Concurrent
import play.api.Logger
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
object AdvancedRoomMessages {
case class Join(name: String)
case class BroadcastGroup(msg: String, gName: String)
case class BroadcastAll(msg: String)
case class AddGroup(gName: String)
case class RemoveGroup(gName: String)
case class AddUserToGroup(userName: String, gName: String)
case class removeUserFromGroup(userName: String, gName: String)
}
class AdvancedRoom extends Actor {
import scala.collection.mutable._
/**
* common channel for communication
*/
val (enumerator, channel) = Concurrent.broadcast[String]
/**
* every user has his own channel
*/
val users = Map[String, (Enumerator[String],Channel[String])]()
/**
* users can be grouped
*/
val groups = Map[String, Option[Set[String]]]()
import AdvancedRoomMessages._
def receive = {
case Join(name) => {
/**
* join request from the user
*/
if(users contains name) {
/**
* existing user
*/
val iteratee = Iteratee.ignore[String]
sender ! ((iteratee, users(name)._1))
}else {
/**
* join request from a new user
*/
/**
* create new broadcast channel
*/
val (enumerator, channel) = Concurrent.broadcast[String]
users += ((name, (enumerator, channel)))
val iteratee = Iteratee.foreach[String](msg => {
//do something with the message
}).map{ _ => {
/**
* user closed his websocket client, so remove the user
* warning ... also remove the corresponding user name in groups
*/
users(name)._2.eofAndEnd()
users -= name
}}
sender ! (iteratee, enumerator)
}
}
case BroadcastGroup(msg, gName) => {
groups(gName) match {
case Some(gMates) => {
gMates.foreach { person => users(person)._2.push(msg)}
}
case None => Logger.info("empty group") //ignore sending message
}
}
case BroadcastAll(msg) => {
channel push msg
}
case AddGroup(gName: String) => {
groups += ((gName, None))
}
case RemoveGroup(gName: String) => {
groups -= gName
}
case AddUserToGroup(userName, gName) => {
groups(gName) match {
case Some(gMates) => gMates += userName
case None => Set(userName)
}
}
}
}
回答2:
def filter(group_id: String) = Enumeratee.filter[JsValue]{ json: JsValue =>
group_id == (json\"group_id").as[String]
}
This filter has to be applied as,
def chat(group_id: String) = WebSocket.using[JsValue] { request =>
val in = Iteratee.foreach[JsValue]{ msg=>
public_channel.push(msg)
}
(in, public_enumerator &> filter(group_id))
}
回答3:
From my experiments, Concurrent.broadcast
does not send to everyone (some unfortunate naming perhaps?)
Here is what I used that works as expected.
package controllers
import play.api._
import play.api.mvc._
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.collection.mutable.{Set => MS}
import scala.concurrent._
object Application extends Controller {
val c:MS[(Int, Concurrent.Channel[String])] = MS() // (channelID, Channel))
def pushHello = c.foreach(_._2.push("hello")) // push to ALL channels
def index = WebSocket.async[String] { _ => future{
val (out,channel) = Concurrent.broadcast[String]
val channelID = scala.util.Random.nextInt
c.add((channelID, channel))
val in = Iteratee.foreach[String] {
_ match {
case any => channel.push("received:"+any) // push to current channel
}
}.map { _ => c.retain(x => x._1 != channelID) }
(in, out)
}
}
}
回答4:
Short Answer
val (enumerator, channel) = Concurrent.broadcast[String]
use above thing globally
Long answer
package controllers
import play.api._
import play.api.mvc._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.Iteratee
import play.api.libs.iteratee.Enumerator
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import Room._
import scala.concurrent.duration._
object Application extends Controller {
def index = Action {
Ok(views.html.index("Your new application is ready."))
}
/**
* get actor ref
*/
val room = Akka.system.actorOf(Props[Room])
/**
* websocket action
*/
def chat(name: String) = WebSocket.async[String](implicit request => {
implicit val timeout = Timeout(1 seconds)
(room ? Join(name)).mapTo[(Iteratee[String, _], Enumerator[String])]
})
}
//Here is the actor
package controllers
import akka.actor.Actor
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
object Room {
case class Join(name: String)
case class Broadcast(msg: String)
case object Quit
}
class Room extends Actor {
/**
* here is the meat
* Creating channel globally is important here
* This can be accessed across all cases in receive method
* pushing the message into this channel and returning this enumerator to all ,
* broadcasts the message
*/
val (enumerator, channel) = Concurrent.broadcast[String]
/**
* keep track of users
*/
val users = scala.collection.mutable.Set[String]()
import Room._
def receive = {
case Join(name) => {
/**
* add new users
*/
if(!users.contains(name)) {
users += name
val iteratee = Iteratee.foreach[String]{
msg => {
/**
* process messages from users
* here we are broadcast it to all other users
*/
self ! Broadcast(msg)
}
}.map( _ => {
/**
* user closed his websocket.
* remove him from users
*/
users -= name
})
/**
* send iteratee, enumerator pair to the sender of join message
*/
sender ! (iteratee, enumerator)
} else {
/**
* already added users
*/
val iteratee = Iteratee.ignore[String]
/**
* send iteratee and enumerator pair
*/
sender ! (iteratee, enumerator)
}
}
case Broadcast(msg) => channel push(msg)
/**
* close the common channel only when actor is stopped
*/
case Quit => channel eofAndEnd(); context.stop(self)
}
}