Functionally processing a database cursor in Scala

2019-02-24 20:46发布

问题:

When I need to read millions of database rows from a PostgreSQL database using the JDBC driver, I always use a cursor, otherwise I will get an OutOfMemoryError. Here is the pattern (pseudocode) that I use:

begin transaction
execute("declare cursor...")
while (true) {
  boolean processedSomeRows = false
  resultSet = executeQuery("fetch forward...")
  while (resultSet.next()) {
    processedSomeRows = true
    ...
  }
  if (!processedSomeRows) break
}
close cursor
commit

This is a more "functional" equivalent that I came up with to be implemented in Scala:

begin transaction
execute("declare cursor...")

@tailrec
def loop(resultSet: ResultSet,
         processed: Boolean): Boolean = {
  if (!resultSet.next()) processed
  else {
    // Process current result set row
    loop(resultSet, true)
  }
}

while (loop(executeQuery("fetch forward..."), false))
  ; //Empty loop

close cursor
commit

I know this is contrived, but is there a better way without resorting to mutability? If I were trying to do this in Haskell, I might come up with a solution that involves monads, but I don't want to send my mind down those "twisty little passages, all alike," because it might never return...

回答1:

Here is a Scala solution that I came up with:

@tailrec
def processCursor(query: => ResultSet)(process: ResultSet => Unit) {
  @tailrec
  def loop(resultSet: ResultSet,
            processed: Boolean): Boolean = {
    if (!resultSet.next()) processed
    else {
      process
      loop(resultSet, true)
    }
  }
  if (loop(query, false)) processCursor(query)(process)
}

Call it like this:

begin transaction
execute("declare cursor...")

processCursor(statement.executeQuery("fetch forward...")) {
  resultSet =>
  // process current row of the ResultSet
}

close cursor
commit

How can this be improved?



回答2:

Alternative way based on @Ralph answer:

  def processCursor[T](resultSet: ResultSet)(process: ResultSet => T) = {
    @tailrec
    def loop(seq: Seq[T], resultSet: ResultSet): Seq[T] = {
      if (resultSet.next()) loop(seq :+ process(resultSet), resultSet)
      else seq
    }
    loop(Seq.empty, resultSet)
  }