Why does Play's PushEnumerator need to close()

2019-07-06 12:42发布

问题:

The following example, adapted from the Play Framework's documentation:

val enumerateUsers: Enumerator[String] = {
  Enumerator("Guillaume", "Sadek", "Peter", "Erwan")
}
val consumeOne = Cont[String, String](in =>
  in match {
    case Input.EOF =>
      Done("", Input.Empty)
    case Input.Empty =>
      Done("", Input.Empty)
    case Input.El(s) =>
      Done(s, Input.Empty)
  })
println((enumerateUsers |>> consumeOne).flatMap(_.run).await.get)

prints out Guillaume.

However, if I change it so that the Enumerator is a PushEnumerator:

val enumerateUsers: PushEnumerator[String] = Enumerator.imperative[String]()
// consumeOne as before
val i = enumerateUsers |>> consumeOne
enumerateUsers.push("Guillaume")
enumerateUsers.push("Sadek")
enumerateUsers.push("Peter")
enumerateUsers.push("Erwan")
println(i.flatMap(_.run).await.get)
// Timeout exception

I get a timeout exception on the iteratee's promise.

In order to get it to do the same as before, I need to close the PushEnumerator.

val enumerateUsers: PushEnumerator[String] = Enumerator.imperative[String]()
// consumeOne as before
val i = enumerateUsers |>> consumeOne
enumerateUsers.push("Guillaume")
enumerateUsers.push("Sadek")
enumerateUsers.push("Peter")
enumerateUsers.push("Erwan")
enumerateUsers.close() // <-- magic line
println(i.flatMap(_.run).await.get)

And this prints Guillaume as before.

I can't find the doc that tells me why, or what the semantic difference is here. Can someone please point the way?

Edit: I've found my answer in the Play source - it took some hunting :) I'll mark @huynhjl's answer as correct since he sort of answered it, but the specific answer that I was looking for is pretty simple. The Play implementation uses a side-effect to drive the socket, which isn't unreasonable - it's just not what I had assumed (assumptions kill everything :D). Inside play.core.server.netty.Helpers there is a function called socketOut[A](...). socketOut[A](...) has function called step, which returns an Iteratee. This Iteratee writes e to the output channel when the input case matches El(e). My assumption was that Iteratees could partially consume Enumerators from which you could get a value, but it would seem that the only way to may that happen is via a side effect... I think :)

回答1:

The role of the Enumerator object is to feed the elements to the iteratee in a specific order until there are no more elements or until the iteratee is done and then return that iteratee. The close() call indicates when there is no more input so that the iteratee result can be returned. Without calling close it's possible there would still be more to push. So I haven't tried but I expect that this would be equivalent to your example:

val i = enumerateUsers |>> consumeOne
i.flatMap(_.run).onRedeem(println) // we will have a result we want to print
// now feed some data
enumerateUsers.push("Guillaume")
enumerateUsers.push("Sadek")
enumerateUsers.push("Peter")
enumerateUsers.push("Erwan")
// no more input, trigger promise computation
enumerateUsers.close() 

It is more meaningful if you apply enumerateUsers to an iteratee where you cannot tell ahead of time how many elements it needs to get a result (the general case).