How to use IO with Scalaz7 Iteratees without overf

2019-02-06 19:32发布

问题:

Consider this code (taken from here and modified to use bytes rather than lines of characters).

import java.io.{ File, InputStream, BufferedInputStream, FileInputStream }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
import std.list._

object IterateeIOExample {
  type ErrorOr[+A] = EitherT[IO, Throwable, A]

  def openStream(f: File) = IO(new BufferedInputStream(new FileInputStream(f)))
  def readByte(s: InputStream) = IO(Some(s.read()).filter(_ != -1))
  def closeStream(s: InputStream) = IO(s.close())

  def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
    EitherT(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
  }

  def enumBuffered(r: => BufferedInputStream) = new EnumeratorT[Int, ErrorOr] {
    lazy val reader = r
    def apply[A] = (s: StepT[Int, ErrorOr, A]) => s.mapCont(k =>
      tryIO(readByte(reader)) flatMap {
        case None => s.pointI
        case Some(byte) => k(I.elInput(byte)) >>== apply[A]
      })
  }

  def enumFile(f: File) = new EnumeratorT[Int, ErrorOr] {
    def apply[A] = (s: StepT[Int, ErrorOr, A]) =>
      tryIO(openStream(f)).flatMap(stream => I.iterateeT[Int, ErrorOr, A](
        EitherT(
          enumBuffered(stream).apply(s).value.run.ensuring(closeStream(stream)))))
  }

  def main(args: Array[String]) {
    val action = (
      I.consume[Int, ErrorOr, List] &=
      enumFile(new File(args(0)))).run.run
    println(action.unsafePerformIO())
  }
}

Running this code on a decent-sized file (8kb) produces a StackOverflowException. Some searching turned up that the exception could be avoided by using the Trampoline monad instead of IO, but that doesn't seem like a great solution - sacrifice functional purity to get the program to complete at all. The obvious way to fix this is to use IO or Trampoline as a Monad Transformer to wrap the other, but I can't find an implementation of the transformer version of either of them and I'm not enough of a functional-programming guru to know how to write my own (learning more about FP is one of the purposes of this project, but I suspect creating new monad transformers is a bit above my level at the moment). I suppose I could just wrap a big IO action around creating, running and returning the result of my iteratees, but that feels like more of a workaround than a solution.

Presumably some monads can't be converted to monad transformers, so I'd like to know if it's possible to work with large files without dropping IO or overflowing the stack, and if so, how?

Bonus question: I can't think of any way for an iteratee to signal that it's encountered an error while processing except to have it return Either, which makes it less easy to compose them. The code above shows how to use EitherT to handle errors in the enumerator, but how does that work for the iteratees?

回答1:

After creating exceptions and printing their stack length in various place of your code, I felt that your code is not overflowing. All seems to run in constant stack size. So I looked for other places. Eventually I copied the implementation of consume and added some stack depth printing and confirmed it overflowed there.

So this overflows:

(I.consume[Int, Id, List] &= EnumeratorT.enumStream(Stream.fill(10000)(1))).run

But, I then found out that this doesn't:

(I.putStrTo[Int](System.out) &= EnumeratorT.enumStream(Stream.fill(10000)(1)))
  .run.unsafePerformIO()

putStrTo uses foldM and somehow is not causing an overflow. So I am wondering whether consume can be implemented in terms of foldM. I just copied a few things over from consume and tweaked until it compiled:

def consume1[E, F[_]:Monad, A[_]:PlusEmpty:Applicative]: IterateeT[E, F, A[E]] = {
  I.foldM[E, F, A[E]](PlusEmpty[A].empty){ (acc: A[E], e: E) =>
    (Applicative[A].point(e) <+> acc).point[F]
  }
}

And it worked! Printing a long list of ints.