I want to run a custom function on all tables in a SQLite database. The function is more or less the same, but depends on the schema of the individual table. Also, the tables and their schemata are only known at runtime (the program is called with an argument that specifies the path of the database).
This is what I have so far:
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// somehow bind sqlContext to DB
val allTables = sqlContext.tableNames
for( t <- allTables) {
val df = sqlContext.table(t)
val schema = df.columns
sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}
The only hint I found so far needs to know the table in advance, which is not the case in my scenario:
val tableData =
sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
.load()
I am using the xerial sqlite jdbc driver. So how can I conntect solely to a database, not to a table?
Edit: Using Beryllium's answer as a start I updated my code to this:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val metaData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()
val myTableNames = metaData.select("tbl_name").distinct()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.table(t.toString)
for (record <- tableData.select("*")) {
println(record)
}
}
At least I can read the table names at runtime which is a huge step forward for me. But I can't read the tables. I tried both
val tableData = sqlContext.table(t.toString)
and
val tableData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> t.toString)).load()
in the loop, but in both cases I get a NullPointerException. Although I can print the table names it seems I cannot connect to them.
Last but not least I always get an SQLITE_ERROR: Connection is closed
error. It looks to be the same issue described in this question: SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database