How do I read a large CSV file with Scala Stream c

2020-01-28 03:39发布

问题:

How do I read a large CSV file (> 1 Gb) with a Scala Stream? Do you have a code example? Or would you use a different way to read a large CSV file without loading it into memory first?

回答1:

Just use Source.fromFile(...).getLines as you already stated.

That returns an Iterator, which is already lazy (You'd use stream as a lazy collection where you wanted previously retrieved values to be memoized, so you can read them again)

If you're getting memory problems, then the problem will lie in what you're doing after getLines. Any operation like toList, which forces a strict collection, will cause the problem.



回答2:

I hope you don't mean Scala's collection.immutable.Stream with Stream. This is not what you want. Stream is lazy, but does memoization.

I don't know what you plan to do, but just reading the file line-by-line should work very well without using high amounts of memory.

getLines should evaluate lazily and should not crash (as long as your file does not have more than 2³² lines, afaik). If it does, ask on #scala or file a bug ticket (or do both).



回答3:

If you are looking to process the large file line-by-line while avoiding requiring the entire file's contents be loaded into memory all at once, then you can use the Iterator returned by scala.io.Source.

I have a small function, tryProcessSource, (containing two sub-functions) which I use for exactly these types of use-cases. The function takes up to four parameters, of which only the first is required. The other parameters have sane default values provided.

Here's the function profile (full function implementation is at the bottom):

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
  ???
}

The first parameter, file: File, is required. And it is just any valid instance of java.io.File which points to a line-oriented text file, like a CSV.

The second parameter, parseLine: (Int, String) => Option[List[String]], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, unparsedLine: String. And then return an Option[List[String]]. The function may return a Some wrapped List[String] consisting of the valid column values. Or it may return a None which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, line) => Some(List(line)) is provided. This default results in the entire line being returned as a single String value.

The third parameter, filterLine: (Int, List[String]) => Option[Boolean], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, parsedValues: List[String]. And then return an Option[Boolean]. The function may return a Some wrapped Boolean indicating whether this particular line should be included in the output. Or it may return a None which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(true) is provided. This default results in all lines being included.

The fourth and final parameter, retainValues: (Int, List[String]) => Option[List[String]], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, parsedValues: List[String]. And then return an Option[List[String]]. The function may return a Some wrapped List[String] consisting of some subset and/or alteration of the existing column values. Or it may return a None which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(values) is provided. This default results in the values parsed by the second parameter, parseLine.

Consider a file with the following contents (4 lines):

street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240

The following calling profile...

val tryLinesDefaults =
  tryProcessSource(new File("path/to/file.csv"))

...results in this output for tryLinesDefaults (the unaltered contents of the file):

Success(
  List(
    List("street,street2,city,state,zip"),
    List("100 Main Str,,Irving,TX,75039"),
    List("231 Park Ave,,Irving,TX,75039"),
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
  )
)

The following calling profile...

val tryLinesParseOnly =
  tryProcessSource(
      new File("path/to/file.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
  )

...results in this output for tryLinesParseOnly (each line parsed into the individual column values):

Success(
  List(
    List("street","street2","city","state","zip"),
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
  )
)

The following calling profile...

val tryLinesIrvingTxNoHeader =
  tryProcessSource(
      new File("C:/Users/Jim/Desktop/test.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
    , filterLine =
        (index, parsedValues) =>
          Some(
            (index != 0) && //skip header line
            (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
            (parsedValues(3).toLowerCase == "Tx".toLowerCase)
          )
  )

...results in this output for tryLinesIrvingTxNoHeader (each line parsed into the individual column values, no header and only the two rows in Irving,Tx):

Success(
  List(
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
  )
)

Here's the entire tryProcessSource function implementation:

import scala.io.Source
import scala.util.Try

import java.io.File

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
  def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
    try {Try(transfer(source))} finally {source.close()}
  def recursive(
    remaining: Iterator[(String, Int)],
    accumulator: List[List[String]],
    isEarlyAbort: Boolean =
      false
  ): List[List[String]] = {
    if (isEarlyAbort || !remaining.hasNext)
      accumulator
    else {
      val (line, index) =
        remaining.next
      parseLine(index, line) match {
        case Some(values) =>
          filterLine(index, values) match {
            case Some(keep) =>
              if (keep)
                retainValues(index, values) match {
                  case Some(valuesNew) =>
                    recursive(remaining, valuesNew :: accumulator) //capture values
                  case None =>
                    recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                }
              else
                recursive(remaining, accumulator) //discard row
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        case None =>
          recursive(remaining, accumulator, isEarlyAbort = true) //early abort
      }
    }
  }
  Try(Source.fromFile(file)).flatMap(
    bufferedSource =>
      usingSource(bufferedSource) {
        source =>
          recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
      }
  )
}

While this solution is relatively succinct, it took me considerable time and many refactoring passes before I was finally able to get to here. Please let me know if you see any ways it might be improved.


UPDATE: I have just asked the issue below as it's own StackOverflow question. And it now has an answer fixing the error mentioned below.

I had the idea to try and make this even more generic changing the retainValues parameter to transformLine with the new generics-ified function definition below. However, I keep getting the highlight error in IntelliJ "Expression of type Some[List[String]] doesn't conform to expected type Option[A]" and wasn't able to figure out how to change the default value so the error goes away.

def tryProcessSource2[A <: AnyRef](
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  transformLine: (Int, List[String]) => Option[A] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
  ???
}

Any assistance on how to make this work would be greatly appreciated.