Connect to SQLite in Apache Spark

2019-04-11 07:33发布

问题:

I want to run a custom function on all tables in a SQLite database. The function is more or less the same, but depends on the schema of the individual table. Also, the tables and their schemata are only known at runtime (the program is called with an argument that specifies the path of the database).

This is what I have so far:

val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// somehow bind sqlContext to DB

val allTables = sqlContext.tableNames

for( t <- allTables) {
    val df = sqlContext.table(t)
    val schema = df.columns
    sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}

The only hint I found so far needs to know the table in advance, which is not the case in my scenario:

val tableData = 
  sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
    .load()

I am using the xerial sqlite jdbc driver. So how can I conntect solely to a database, not to a table?

Edit: Using Beryllium's answer as a start I updated my code to this:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val metaData = sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
                 "dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()

val myTableNames = metaData.select("tbl_name").distinct()

for (t <- myTableNames) {
    println(t.toString)

    val tableData = sqlContext.table(t.toString)

    for (record <- tableData.select("*")) {
        println(record)
    }
}

At least I can read the table names at runtime which is a huge step forward for me. But I can't read the tables. I tried both

val tableData = sqlContext.table(t.toString)

and

val tableData = sqlContext.read.format("jdbc")
    .options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
                 "dbtable" -> t.toString)).load()

in the loop, but in both cases I get a NullPointerException. Although I can print the table names it seems I cannot connect to them.

Last but not least I always get an SQLITE_ERROR: Connection is closed error. It looks to be the same issue described in this question: SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database

回答1:

There are two options you can try

Use JDBC directly

  • Open a separate, plain JDBC connection in your Spark job
  • Get the tables names from the JDBC meta data
  • Feed these into your for comprehension

Use a SQL query for the "dbtable" argument

You can specify a query as the value for the dbtable argument. Syntactically this query must "look" like a table, so it must be wrapped in a sub query.

In that query, get the meta data from the database:

val df = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:xxx",
    "user" -> "x",
    "password" -> "x",
    "dbtable" -> "(select * from pg_tables) as t")).load()

This example works with PostgreSQL, you have to adapt it for SQLite.

Update

It seems that the JDBC driver only supports to iterate over one result set. Anyway, when you materialize the list of table names using collect(), then the following snippet should work:

val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()

for (t <- myTableNames) {
  println(t.toString)

  val tableData = sqlContext.read.format("jdbc")
    .options(
      Map(
        "url" -> "jdbc:sqlite:/x.db",
        "dbtable" -> t)).load()

  tableData.show()
}