The goal of this question is to document:
With small changes these methods should work with other supported languages including Scala and R.
Writing data
Include applicable JDBC driver when you submit the application or start shell. You can use for example --packages
:
bin/pyspark --packages group:name:version
or combining driver-class-path
and jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
These properties can be also set using PYSPARK_SUBMIT_ARGS
environment variable before JVM instance has been started or using conf/spark-defaults.conf
to set spark.jars.packages
or spark.jars
/ spark.driver.extraClassPath
.
Choose desired mode. Spark JDBC writer supports following modes:
append
: Append contents of this :class:DataFrame
to existing data.
overwrite
: Overwrite existing data.
ignore
: Silently ignore this operation if data already exists.
error
(default case): Throw an exception if data already exists.
Upserts or other fine-grained modifications are not supported
mode = ...
Prepare JDBC URI, for example:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
(Optional) Create a dictionary of JDBC arguments.
properties = {
"user": "foo",
"password": "bar"
}
properties
/ options
can be also used to set supported JDBC connection properties.
Use DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
to save the data (see pyspark.sql.DataFrameWriter
for details).
Known issues:
Suitable driver cannot be found when driver has been included using --packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)
Assuming there is no driver version mismatch to solve this you can add driver
class to the properties
. For example:
properties = {
...
"driver": "org.postgresql.Driver"
}
using df.write.format("jdbc").options(...).save()
may result in:
java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.
Solution unknown.
in Pyspark 1.3 you can try calling Java method directly:
df._jdf.insertIntoJDBC(url, "baz", True)
Reading data
- Follow steps 1-4 from Writing data
Use sqlContext.read.jdbc
:
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
or sqlContext.read.format("jdbc")
:
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
Known issues and gotchas:
- Suitable driver cannot be found - see: Writing data
Spark SQL supports predicate pushdown with JDBC sources although not all predicates can pushed down. It also doesn't delegate limits nor aggregations. Possible workaround is to replace dbtable
/ table
argument with a valid subquery. See for example:
- Does spark predicate pushdown work with JDBC?
- More than one hour to execute pyspark.sql.DataFrame.take(4)
- How to use SQL query to define table in dbtable?
By default JDBC data sources loads data sequentially using a single executor thread. To ensure distributed data loading you can:
- Provide partitioning
column
(must be IntegeType
), lowerBound
, upperBound
, numPartitions
.
- Provide a list of mutually exclusive predicates
predicates
, one for each desired partition.
See:
- Partitioning in spark while reading from RDBMS via JDBC,
- How to optimize partitioning when migrating data from JDBC source?,
- How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
- How to partition Spark RDD when importing Postgres using JDBC?
In a distributed mode (with partitioning column or predicates) each executor operates in its own transaction. If the source database is modified at the same time there is no guarantee that the final view will be consistent.
Where to find suitable drivers:
Other options
Depending on the database specialized source might exist, and be preferred in some cases:
- Greenplum - Pivotal Greenplum-Spark Connector
- Apache Phoenix - Apache Spark Plugin
- Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
- Amazon Redshift - Databricks Redshift connector (current versions available only in a proprietary Databricks Runtime. Discontinued open source version, available on GitHub).
Download mysql-connector-java driver and keep in spark jar folder,observe the bellow python code here writing data into "acotr1",we have to create acotr1 table structure in mysql database
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
df.write.jdbc(mysql_url,table="actor1",mode="append")
Refer this link to download the jdbc for postgres and follow the steps to download jar file
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html
jar file will be download in the path like this.
"/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar"
If your spark version is 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
save the file as python and run "python respectivefilename.py"