I have csv file in Amazon s3 with is 62mb in size (114 000 rows). I am converting it into spark dataset, and taking first 500 rows from it. Code is as follow;
DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");
set.take(500)
The whole operation takes 20 to 30 sec.
Now I am trying the same but rather using csv I am using mySQL table with 119 000 rows. MySQL server is in amazon ec2. Code is as follow;
String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;
SparkSession spark=StartSpark.getSparkSession();
SQLContext sc = spark.sqlContext();
DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set = sc
.read()
.option("url", url)
.option("dbtable", this.tableName)
.option("driver","com.mysql.jdbc.Driver")
.format("jdbc")
.load();
set.take(500);
This is taking 5 to 10 minutes.
I am running spark inside jvm. Using same configuration in both cases.
I can use partitionColumn,numParttition etc but I don't have any numeric column and one more issue is the schema of the table is unknown to me.
My issue is not how to decrease the required time as I know in ideal case spark will run in cluster but what I can not understand is why this big time difference in the above two case?
This problem has been covered multiple times on StackOverflow:
- How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
- spark jdbc df limit... what is it doing?
- How to use JDBC source to write and read data in (Py)Spark?
and in external sources:
- https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#parallelizing-reads
so just to reiterate - by default DataFrameReader.jdbc
doesn't distribute data or reads. It uses single thread, single exectuor.
To distribute reads:
use ranges with lowerBound
/ upperBound
:
Properties properties;
Lower
Dataset<Row> set = sc
.read()
.option("partitionColumn", "foo")
.option("numPartitions", "3")
.option("lowerBound", 0)
.option("upperBound", 30)
.option("url", url)
.option("dbtable", this.tableName)
.option("driver","com.mysql.jdbc.Driver")
.format("jdbc")
.load();
predicates
Properties properties;
Dataset<Row> set = sc
.read()
.jdbc(
url, this.tableName,
{"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
properties
)
Please follow the steps below
1.download a copy of the JDBC connector for mysql. I believe you already have one.
wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar
2.create a db-properties.flat file in the below format
jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>
3.create a empty table first where you want to load the data.
invoke spark shell with driver class
spark-shell --driver-class-path <your path to mysql jar>
then import all the required package
import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
initiate a hive context or a sql context
val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql
set some of the properties
sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
Load mysql db properties from file
val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db- properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")
create a query to read the data from your table and pass it to read method of #sqlcontext. this is where you can manage your where clause
val df1 = "(SELECT * FROM your_table_name) as s1"
pass the jdbcurl, select query and db properties to read method
val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)
write it to your table
df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")