I want to create a Play 2 Enumeratee that takes in values and outputs them, chunked together, every x
seconds/milliseconds. That way, in a multi-user websocket environment with lots of user input, one could limit the number of received frames per second.
I know that it's possible to group a set number of items together like this:
val chunker = Enumeratee.grouped(
Traversable.take[Array[Double]](5000) &>> Iteratee.consume()
)
Is there a built-in way to do this based on time rather than based on the number of items?
I was thinking about doing this somehow with a scheduled Akka job, but on first sight this seems inefficient, and I'm not sure if concurency issues would arise.
How about like this? I hope this is helpful for you.
package controllers
import play.api._
import play.api.Play.current
import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Promise
object Application extends Controller {
def index = Action {
val queue = new scala.collection.mutable.Queue[String]
Akka.future {
while( true ){
Logger.info("hogehogehoge")
queue += System.currentTimeMillis.toString
Thread.sleep(100)
}
}
val timeStream = Enumerator.fromCallback { () =>
Promise.timeout(Some(queue), 200)
}
Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
var str = ""
while(queue.nonEmpty){
str += queue.dequeue + ", "
}
str
})))
}
}
And this document is also helpful for you.
http://www.playframework.com/documentation/2.0/Enumerators
UPDATE
This is for play2.1 version.
package controllers
import play.api._
import play.api.Play.current
import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Akka
import play.api.libs.concurrent.Promise
import scala.concurrent._
import ExecutionContext.Implicits.global
object Application extends Controller {
def index = Action {
val queue = new scala.collection.mutable.Queue[String]
Akka.future {
while( true ){
Logger.info("hogehogehoge")
queue += System.currentTimeMillis.toString
Thread.sleep(100)
}
}
val timeStream = Enumerator.repeatM{
Promise.timeout(queue, 200)
}
Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
var str = ""
while(queue.nonEmpty){
str += queue.dequeue + ", "
}
str
})))
}
}
Here I've quickly defined an iteratee that will take values from an input for a fixed time length t measured in milliseconds and an enumeratee that will allow you to group and further process an input stream divided into segments constructed within such length t. It relies on JodaTime to keep track of how much time has passed since the iteratee began.
def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = {
var startTime = new Instant()
def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = {
val timePassed = new Interval(startTime, new Instant()).toDurationMillis
input match {
case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) }
case Input.Empty => Cont[E, List[E]](i => step(state)(i))
case Input.El(e) =>
if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) }
else Cont[E, List[E]](i => step(e::state)(i))
}
}
Cont(step(List[E]()))
}
def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis))