我听说iteratees懒惰,但如何懒惰究竟是什么人? 可替代地,可以用iteratees后处理功能进行融合,从而使中间数据结构不具有待建?
我可以在我的iteratee例如建100万件Stream[Option[String]]
从java.io.BufferedReader
,并且随后筛选出None
S,在组成方式,而不需要整个流中举行记忆? 而在同一时间的保证,我不吹堆栈? 或类似的东西-它没有使用Stream
。
我目前使用Scalaz 6,但如果其他iteratee实现是能够做到这一点以更好的方式,我很想知道。
请提供完整的解决方案,其中包括关闭BufferedReader
,并呼吁unsafePerformIO
,如果适用。
下面是使用演示你感兴趣的属性Scalaz 7库快速iteratee例如:常量内存和堆栈使用。
问题
首先,假设我们已经得到了与十进制数字在每行一个字符串一个大文本文件,我们想找到所有包含至少20个零线。 我们可以产生这样一些样本数据:
val w = new java.io.PrintWriter("numbers.txt")
val r = new scala.util.Random(0)
(1 to 1000000).foreach(_ =>
w.println((1 to 100).map(_ => r.nextInt(10)).mkString)
)
w.close()
现在,我们已经有了一个文件名为numbers.txt
。 让我们从一个打开BufferedReader
:
val reader = new java.io.BufferedReader(new java.io.FileReader("numbers.txt"))
这不是过大(〜97兆字节),但它足够大,让我们容易看到我们的内存使用是否真正保持不变,而我们处理它。
建立我们的枚举
首先对部分进口:
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I }
和枚举(请注意,我改变IoExceptionOr
s转换Option
S表示方便起见):
val enum = I.enumReader(reader).map(_.toOption)
Scalaz 7目前没有提供一个很好的方式来枚举文件行,所以我们通过在时刻的文件,一个字符组块。 当然,这将是非常缓慢的,但我不会担心,在这里,因为这个演示的目的是要表明,我们可以处理在不断的记忆和不吹堆栈这个大十岁上下的文件。 这个答案的最后一节给出了更好的性能的方法,但在这里我们只劈在换行符:
val split = I.splitOn[Option[Char], List, IO](_.cata(_ != '\n', false))
而如果这样的事实splitOn
需要一个谓词指定在不拆分混淆你,你并不孤单。 split
是我们的enumeratee的第一个例子。 我们会继续和我们的包裹在枚举它:
val lines = split.run(enum).map(_.sequence.map(_.mkString))
现在,我们已经有了一个枚举Option[String]
S IN的IO
单子。
过滤文件与enumeratee
接下来我们断言,记得我们说,我们希望线条与至少20个零:
val pred = (_: String).count(_ == '0') >= 20
我们可以把它变成一个过滤enumeratee和包装我们的枚举在于:
val filtered = I.filter[Option[String], IO](_.cata(pred, true)).run(lines)
我们将建立一个简单的动作,只是打印一切,使得它通过这个过滤器:
val printAction = (I.putStrTo[Option[String]](System.out) &= filtered).run
当然,我们还没有真正阅读任何东西。 要做到这一点,我们使用unsafePerformIO
:
printAction.unsafePerformIO()
现在,我们可以看Some("0946943140969200621607610...")
力量慢慢的,而我们的内存使用量保持不变滚动。 它是慢,错误处理和输出是有点麻烦,但不是太糟糕了,我想约九行代码。
获得从iteratee输出
那是foreach
-ish使用。 我们还可以创建更象折叠,例如收拾,使其通过过滤器并以列表返回它们的元素的iteratee。 只要重复上述的一切,直到printAction
定义,然后写这个:
val gatherAction = (I.consume[Option[String], IO, List] &= filtered).run
踢的动作关:
val xs: Option[List[String]] = gatherAction.unsafePerformIO().sequence
现在去喝杯咖啡(它可能需要是相当远)。 当你回来你要么有None
(在的情况下, IOException
沿途某处)或Some
含1943名字符串列表。
完成(快)的例子,自动关闭文件
要回答你的问题有关关闭的读者,这里有一个完整的工作示例这是大致相当于上面的第二个方案,但与需要用于打开和关闭读者负责的枚举。 这也是很多,要快得多,因为它读取线,而不是字符。 首先进口和几个辅助方法:
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, Either[Throwable, B]](
action.catchLeft.map(
r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
)
)
def enumBuffered(r: => BufferedReader) =
new EnumeratorT[Either[Throwable, String], IO] {
lazy val reader = r
def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
k =>
tryIO(IO(reader.readLine())).flatMap {
case Right(null) => s.pointI
case Right(line) => k(I.elInput(Right(line))) >>== apply[A]
case e => k(I.elInput(e))
}
)
}
而现在的枚举:
def enumFile(f: File): EnumeratorT[Either[Throwable, String], IO] =
new EnumeratorT[Either[Throwable, String], IO] {
def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
k =>
tryIO(IO(new BufferedReader(new FileReader(f)))).flatMap {
case Right(reader) => I.iterateeT(
enumBuffered(reader).apply(s).value.ensuring(IO(reader.close()))
)
case Left(e) => k(I.elInput(Left(e)))
}
)
}
我们已经准备好了:
val action = (
I.consume[Either[Throwable, String], IO, List] %=
I.filter(_.fold(_ => true, _.count(_ == '0') >= 20)) &=
enumFile(new File("numbers.txt"))
).run
现在,当处理完成后,读者将被关闭。
我应该读得远一点......这恰恰是什么enumeratees的。 Enumeratees在Scalaz 7定义和播放2,但不是在Scalaz 6。
Enumeratees是“垂直的”组合物(在“垂直整合工业”的意义上),而普通iteratees在一个“水平的”方式monadically构成。