Creating a time-based chunking Enumeratee

2019-04-04 19:01发布

问题:

I want to create a Play 2 Enumeratee that takes in values and outputs them, chunked together, every x seconds/milliseconds. That way, in a multi-user websocket environment with lots of user input, one could limit the number of received frames per second.

I know that it's possible to group a set number of items together like this:

val chunker = Enumeratee.grouped(
  Traversable.take[Array[Double]](5000) &>> Iteratee.consume()
)

Is there a built-in way to do this based on time rather than based on the number of items?

I was thinking about doing this somehow with a scheduled Akka job, but on first sight this seems inefficient, and I'm not sure if concurency issues would arise.

回答1:

How about like this? I hope this is helpful for you.

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.fromCallback { () =>
       Promise.timeout(Some(queue), 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }

And this document is also helpful for you. http://www.playframework.com/documentation/2.0/Enumerators

UPDATE This is for play2.1 version.

 package controllers

 import play.api._
 import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.iteratee._
 import play.api.libs.concurrent.Akka
 import play.api.libs.concurrent.Promise
 import scala.concurrent._
 import ExecutionContext.Implicits.global

 object Application extends Controller {

   def index = Action {
     val queue = new scala.collection.mutable.Queue[String]
     Akka.future {
       while( true ){
         Logger.info("hogehogehoge")
         queue += System.currentTimeMillis.toString
         Thread.sleep(100)
       }
     }
     val timeStream = Enumerator.repeatM{
       Promise.timeout(queue, 200)
     }
     Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue =>
       var str = ""
       while(queue.nonEmpty){
         str += queue.dequeue + ", "
       }
       str
     })))
   }

 }


回答2:

Here I've quickly defined an iteratee that will take values from an input for a fixed time length t measured in milliseconds and an enumeratee that will allow you to group and further process an input stream divided into segments constructed within such length t. It relies on JodaTime to keep track of how much time has passed since the iteratee began.

def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = {
  var startTime = new Instant()

  def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = {
    val timePassed = new Interval(startTime, new Instant()).toDurationMillis

    input match {
      case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) }
      case Input.Empty => Cont[E, List[E]](i => step(state)(i))
      case Input.El(e) =>
        if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) }
        else Cont[E, List[E]](i => step(e::state)(i))
    }
  }

  Cont(step(List[E]()))
}

def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis))