Strange enumerator/iteratee/Future behaviour when

2019-09-11 13:28发布

问题:

I am new to Scala and Play! programming but have a reasonable amount of experience with webapps in Django, and plenty of general programming experience.

I have been doing some exercises of my own to try to improve my understanding of Play! and this behaviour has me totally stumped. This is a follow-up question to the more general one here: Trying to understand Scala enumerator/iteratees

I am trying to query a database, using Enumerators, Iteratees and Futures.

When I code my controller thus:

  def index = Action {
    db.withConnection { conn=>
      val stmt = conn.createStatement()
      val result = stmt.executeQuery("select * from datatable")

      val resultEnum:Enumerator[TestDataObject] = Enumerator.generateM {
        logger.debug("called enumerator")
        result.next() match {
          case true =>
            val obj = TestDataObject(result.getString("name"), result.getString("object_type"),
              result.getString("quantity").toInt, result.getString("cost").toFloat)
            logger.info(obj.toJsonString)
            Future(Some(obj))
          case false =>
            logger.warn("reached end of iteration")
            stmt.close()
            Future(None)
        }
      }

      val consume:Iteratee[TestDataObject,Seq[TestDataObject]] = {
        Iteratee.fold[TestDataObject,Seq[TestDataObject]](Seq.empty[TestDataObject]) { (result,chunk) => result :+ chunk }
      }

      val newIteree = Iteratee.flatten(resultEnum(consume))
      val eventuallyResult:Future[Seq[TestDataObject]] = newIteree.run

      Ok(Await.result(eventuallyResult,60 seconds))

    }
  }

I get the expected result in the log:

10:50:27.765 [ForkJoinPool-3-worker-15] DEBUG TestDataObjectController - called enumerator
10:50:27.856 [ForkJoinPool-3-worker-15] INFO  TestDataObjectController - {"name":"thingamajig","objtype":"widget","quantity":200,"cost":3.99}
10:50:27.860 [ForkJoinPool-3-worker-15] DEBUG TestDataObjectController - called enumerator
10:50:27.863 [ForkJoinPool-3-worker-15] INFO  TestDataObjectController - {"name":"doofus","objtype":"widget","quantity":900,"cost":1.99}
10:50:27.863 [ForkJoinPool-3-worker-11] DEBUG TestDataObjectController - called enumerator
10:50:27.868 [ForkJoinPool-3-worker-11] INFO  TestDataObjectController - {"name":"wotsit","objtype":"widget","quantity":30,"cost":0.49}
10:50:27.868 [ForkJoinPool-3-worker-13] DEBUG TestDataObjectController - called enumerator
10:50:27.871 [ForkJoinPool-3-worker-13] INFO  TestDataObjectController - {"name":"foo","objtype":"thingy","quantity":490,"cost":1.49}
10:50:27.871 [ForkJoinPool-3-worker-11] DEBUG TestDataObjectController - called enumerator
10:50:27.871 [ForkJoinPool-3-worker-11] WARN  TestDataObjectController - reached end of iteration

and I get the expected JSON object (with an implicit converter defined in the controller class, not shown here).

However, when I try to code it properly, using Action.async:

  def index = Action.async {
    db.withConnection { conn=>
      val stmt = conn.createStatement()
      val result = stmt.executeQuery("select * from datatable")

      val resultEnum:Enumerator[TestDataObject] = Enumerator.generateM {
        logger.debug("called enumerator")
        result.next() match {
          case true =>
            val obj = TestDataObject(result.getString("name"), result.getString("object_type"),
              result.getString("quantity").toInt, result.getString("cost").toFloat)
            logger.info(obj.toJsonString)
            Future(Some(obj))
          case false =>
            logger.warn("reached end of iteration")
            stmt.close()
            Future(None)
        }
      }

      val consume:Iteratee[TestDataObject,Seq[TestDataObject]] = {
        Iteratee.fold[TestDataObject,Seq[TestDataObject]](Seq.empty[TestDataObject]) { (result,chunk) => result :+ chunk }
      }

      val newIteree = Iteratee.flatten(resultEnum(consume))
      val eventuallyResult:Future[Seq[TestDataObject]] = newIteree.run

      eventuallyResult.map { data=> Ok(data)}

    }
  }

then the enumerator terminates on the first run!

[info] play.api.Play - Application started (Dev)
10:53:47.571 [ForkJoinPool-3-worker-13] DEBUG TestDataObjectController - called enumerator
10:53:47.572 [ForkJoinPool-3-worker-13] WARN  TestDataObjectController - reached end of iteration

and I get a blank JSON array returned

It appears that result.next() is behaving differently in the two contexts, but I can't work out why. I was wondering whether different iterations are being called in parallel threads, but this is reproduced reliably every time I run the code so I would not expect it to be a thread concurrency issue. Thanks for your time!

回答1:

I may be completely wrong about this, but I now have a theory as to what is going on.....

If I am using Action.async, the enumerator callbacks are only actually being evaluated once the Action block has finished and therefore the stmt and result objects have been finalised and no longer work. On the other hand, if I block the Action until the result is available (which I realise that the documentation says you shouldn't do) by using Await, then the values with the Action block have not yet been finalised and therefore I get the expected result.

As I said in the question, I am still trying to get my head around how Scala and Play work so if you know better please comment!