I want to know how can I do following things in scala?
- Connect to a postgreSQL database using Spark scala.
- Write SQL queries like SELECT , UPDATE etc. to modify a table in that database.
I know to do it using scala but how to import the connector jar of psql scala into sbt while packaging it?
Our goal is to run parallel SQL queries from the Spark workers.
Build setup
Add the connector and JDBC to the
libraryDependencies
inbuild.sbt
. I've only tried this with MySQL, so I'll use that in my examples, but Postgres should be much the same.Code
When you create the
SparkContext
you tell it which jars to copy to the executors. Include the connector jar. A good-looking way to do this:Now Spark is ready to connect to the database. Each executor will run part of the query, so that the results are ready for distributed computation.
There are two options for this. The older approach is to use
org.apache.spark.rdd.JdbcRDD
:Check out the documentation for the parameters. Briefly:
SparkContext
.SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
in the example.ResultSet
into something. In the example we convert it into aString
, so you end up with anRDD[String]
.Since Apache Spark version 1.3.0 another method is available through the DataFrame API. Instead of the
JdbcRDD
you would create anorg.apache.spark.sql.DataFrame
:See https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases for the full list of options (the key range and number of partitions can be set just like with
JdbcRDD
).Updates
JdbcRDD
does not support updates. But you can simply do them in aforeachPartition
.(This creates one connection per partition. If that is a concern, use a connection pool!)
DataFrame
s support updates through thecreateJDBCTable
andinsertIntoJDBC
methods.