Executing multiple SQL queries on Spark

2020-04-01 07:41发布

问题:

I have a Spark SQL query in a file test.sql -

CREATE GLOBAL TEMPORARY VIEW VIEW_1 AS select a,b from abc

CREATE GLOBAL TEMPORARY VIEW VIEW_2 AS select a,b from VIEW_1

select * from VIEW_2

Now, I start my spark-shell and try to execute it like this -

val sql = scala.io.Source.fromFile("test.sql").mkString
spark.sql(sql).show

This fails with the following error -

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<' expecting {<EOF>, 'GROUP', 'ORDER', 'HAVING', 'LIMIT', 'OR', 'AND', 'WINDOW', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'SORT', 'CLUSTER', 'DISTRIBUTE'}(line 1, pos 128)

I tried to execute these queries 1 by 1 in different spark.sql statements and it runs fine. The problem is, I have 6-7 queries which creates temporary views and finally i need output from my last view. Is there a way through which i can run these SQL's in a single spark.sql statement. I have worked on Postgres SQL (Redshift) and that is able to execute such kind of queries. In spark sql, i will have to maintain a lot of files in this case.

回答1:

The problem is that mkString concatenates all the lines in a single string, which cannot be properly parsed as a valid SQL query.

Each line from the script file should be executed as a separate query, for example:

scala.io.Source.fromFile("test.sql").getLines()
  .filterNot(_.isEmpty)  // filter out empty lines
  .foreach(query =>
    spark.sql(query).show
  )

Update

If queries are split on more than one line, the case is a bit more complex.

We absolutely need to have a token that marks the end of a query. Let it be the semi-colon character, as in standard SQL.

First, we collect all non-empty lines from the source file:

val lines = scala.io.Source.fromFile(sqlFile).getLines().filterNot(_.isEmpty)

Then we process the collected lines, concatenating each new line with the previous one, if it does not end with a semicolon:

val queries = lines.foldLeft(List[String]()) { case(queries, line) =>
  queries match {
    case Nil => List(line) // case for the very first line
    case init :+ last =>
      if (last.endsWith(";")) {
        // if a query ended on a previous line, we simply append the new line to the list of queries
        queries :+ line.trim
      } else {
        // the query is not terminated yet, concatenate the line with the previous one
        val queryWithNextLine = last + " " + line.trim
        init :+ queryWithNextLine
      }
  }
}