斯卡拉:如何遍历流/迭代器收集结果分为几个不同的收藏(Scala: how to traverse

2019-08-16 18:24发布

我经历是太大,不适合到内存中,收集2型表达的日志文件,什么是我的下面迭代片段更好的功能替代?

def streamData(file: File, errorPat: Regex, loginPat: Regex): List[(String, String)]={
  val lines : Iterator[String] = io.Source.fromFile(file).getLines()

  val logins: mutable.Map[String, String] = new mutable.HashMap[String, String]()
  val errors: mutable.ListBuffer[(String, String)] = mutable.ListBuffer.empty

  for (line <- lines){
    line match {
      case errorPat(date,ip)=> errors.append((ip,date))
      case loginPat(date,user,ip,id) =>logins.put(ip, id)
      case _ => ""
    }
  }

  errors.toList.map(line => (logins.getOrElse(line._1,"none") + " " + line._1,line._2))
}

Answer 1:

这里是一个可能的解决方案:

def streamData(file: File, errorPat: Regex, loginPat: Regex): List[(String,String)] = {
  val lines = Source.fromFile(file).getLines
  val (err, log) = lines.collect {
        case errorPat(inf, ip) => (Some((ip, inf)), None)
        case loginPat(_, _, ip, id) => (None, Some((ip, id)))
      }.toList.unzip
  val ip2id = log.flatten.toMap
  err.collect{ case Some((ip,inf)) => (ip2id.getOrElse(ip,"none") + "" + ip, inf) }
}


Answer 2:

更正:
1)去除不需要的类型声明
2)元组解构代替ulgy ._1
3)左折叠的,而不是可变的累加器
4)使用更方便的操作者等的方法:++

def streamData(file: File, errorPat: Regex, loginPat: Regex): List[(String, String)] = {
    val lines = io.Source.fromFile(file).getLines()

    val (logins, errors) =
        ((Map.empty[String, String], Seq.empty[(String, String)]) /: lines) {
            case ((loginsAcc, errorsAcc), next) =>
                next match {
                    case errorPat(date, ip) => (loginsAcc, errorsAcc :+ (ip -> date))
                    case loginPat(date, user, ip, id) => (loginsAcc + (ip -> id) , errorsAcc)
                    case _ => (loginsAcc, errorsAcc)
                }
        }

// more concise equivalent for
// errors.toList.map { case (ip, date) => (logins.getOrElse(ip, "none") + " " + ip) -> date }
    for ((ip, date) <- errors.toList) 
    yield (logins.getOrElse(ip, "none") + " " + ip) -> date


}


Answer 3:

我有几个建议:

  • 取而代之的是对/元组的,它往往是更好地使用自己的类。 它提供了一个有意义的名字,以这两个类型及其字段,这使得代码更易读。
  • 拆分代码放入小零件。 特别是,尝试分离的代码块不需要被捆绑在一起。 这使你的代码更容易理解,更稳健,不易出错,更容易进行测试。 你的情况这会是很好的分离产生的输入(日志文件的线)和消费它产生的结果。 例如,你可以让你的功能自动测试,而不必存储在一个文件中的样本数据。

作为一个例子和锻炼,我试图让基于Scalaz iteratees的解决方案。 这是一个有点长(包括一些辅助代码IteratorEnumerator ),也许它是任务有点大材小用,但也许有人会发现它的帮助。

import java.io._;
import scala.util.matching.Regex
import scalaz._
import scalaz.IterV._

object MyApp extends App {
  // A type for the result. Having names keeps things
  // clearer and shorter.
  type LogResult = List[(String,String)]

  // Represents a state of our computation. Not only it
  // gives a name to the data, we can also put here
  // functions that modify the state.  This nicely
  // separates what we're computing and how.
  sealed case class State(
    logins: Map[String,String],
    errors: Seq[(String,String)]
  ) {
    def this() = {
      this(Map.empty[String,String], Seq.empty[(String,String)])
    }

    def addError(date: String, ip: String): State =
      State(logins, errors :+ (ip -> date));
    def addLogin(ip: String, id: String): State =
      State(logins + (ip -> id), errors);

    // Produce the final result from accumulated data.
    def result: LogResult =
      for ((ip, date) <- errors.toList)
        yield (logins.getOrElse(ip, "none") + " " + ip) -> date
  }

  // An iteratee that consumes lines of our input. Based
  // on the given regular expressions, it produces an
  // iteratee that parses the input and uses State to
  // compute the result.
  def logIteratee(errorPat: Regex, loginPat: Regex):
            IterV[String,List[(String,String)]] = {
    // Consumes a signle line.
    def consume(line: String, state: State): State =
      line match {
        case errorPat(date, ip)           => state.addError(date, ip);
        case loginPat(date, user, ip, id) => state.addLogin(ip, id);
        case _                            => state
      }

    // The core of the iteratee. Every time we consume a
    // line, we update our state. When done, compute the
    // final result.
    def step(state: State)(s: Input[String]): IterV[String, LogResult] =
      s(el    = line => Cont(step(consume(line, state))),
        empty = Cont(step(state)),
        eof   = Done(state.result, EOF[String]))
    // Return the iterate waiting for its first input.
    Cont(step(new State()));
  }


  // Converts an iterator into an enumerator. This
  // should be more likely moved to Scalaz.
  // Adapted from scalaz.ExampleIteratee
  implicit val IteratorEnumerator = new Enumerator[Iterator] {
    @annotation.tailrec def apply[E, A](e: Iterator[E], i: IterV[E, A]): IterV[E, A] = {
      val next: Option[(Iterator[E], IterV[E, A])] =
        if (e.hasNext) {
          val x = e.next();
          i.fold(done = (_, _) => None, cont = k => Some((e, k(El(x)))))
        } else
          None;
       next match {
         case None => i
         case Some((es, is)) => apply(es, is)
       }
    }
  }


  // main ---------------------------------------------------
  {
    // Read a file as an iterator of lines:
    // val lines: Iterator[String] =
    //    io.Source.fromFile("test.log").getLines();

    // Create our testing iterator:
    val lines: Iterator[String] = Seq(
      "Error: 2012/03 1.2.3.4",
      "Login: 2012/03 user 1.2.3.4 Joe",
      "Error: 2012/03 1.2.3.5",
      "Error: 2012/04 1.2.3.4"
    ).iterator;

    // Create an iteratee.
    val iter = logIteratee("Error: (\\S+) (\\S+)".r, 
                           "Login: (\\S+) (\\S+) (\\S+) (\\S+)".r);
    // Run the the iteratee against the input
    // (the enumerator is implicit)
    println(iter(lines).run);
  }
}


文章来源: Scala: how to traverse stream/iterator collecting results into several different collections