In my application I compare two different Datasets(i.e source table from Hive and Destination from RDBMS) for duplications and mis-matches, it works fine with smaller dataset but when I try to compare data more that 1GB (source alone) it hangs and throws TIMEOUT ERROR
, I tried .config("spark.network.timeout", "600s")
even after increasing the network timeout it throwing java.lang.OutOfMemoryError: GC overhead limit exceeded
.
val spark = SparkSession.builder().master("local")
.appName("spark remote")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.160:9083")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
val source = spark.sql("SELECT * from sample.source").rdd.map(_.mkString(","))
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
val sparkdestination = SparkSession.builder().master("local").appName("Database")
.config("spark.network.timeout", "600s")
.getOrCreate()
val jdbcUsername = "root"
val jdbcPassword = "root"
val url = "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false"
val connectionProperties = new java.util.Properties()
connectionProperties.put("user", jdbcUsername)
connectionProperties.put("password", jdbcPassword)
val queryDestination = "(select * from destination) as dest"
val destination = sparkdestination.read.jdbc(url, queryDestination, connectionProperties).rdd.map(_.mkString(","))
I also tried with destination.persist(StorageLevel.MEMORY_AND_DISK_SER)
(MEMORY_AND_DISK,DISK_ONLY)method but no luck.
EDIT: This is the original error stack :
17/07/11 12:49:43 INFO DAGScheduler: Submitting 22 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at map at stack.scala:76)
17/07/11 12:49:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 22 tasks
17/07/11 12:49:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/07/11 12:51:38 INFO JDBCRDD: closed connection
17/07/11 12:51:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
17/07/11 12:51:38 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
17/07/11 12:49:43 INFO DAGScheduler: Submitting 22 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at map at stack.scala:76)
17/07/11 12:49:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 22 tasks
17/07/11 12:49:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/07/11 12:51:38 INFO JDBCRDD: closed connection
17/07/11 12:51:38 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3410)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:470)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3112)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2341)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2736)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2490)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
17/07/11 12:51:38 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2210)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1989)
EDIT 2:
I tried using :
val options = Map(
"url" -> "jdbc:mysql://192.168.175.35:3306/sample?useSSL=false",
"dbtable" -> queryDestination,
"user" -> "root",
"password" -> "root")
val destination = sparkdestination.read.options(options).jdbc(options("url"), options("dbtable"), "0", 1, 5, 4, new java.util.Properties()).rdd.map(_.mkString(","))
I checked with small amount of data its working but going for large datasets its throwing the error below
ERROR
17/07/11 14:12:46 INFO DAGScheduler: looking for newly runnable stages
17/07/11 14:12:46 INFO DAGScheduler: running: Set(ShuffleMapStage 1)
17/07/11 14:12:46 INFO DAGScheduler: waiting: Set(ResultStage 2)
17/07/11 14:12:46 INFO DAGScheduler: failed: Set()
17/07/11 14:12:50 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.175.160:39913 in memory (size: 19.9 KB, free: 353.4 MB)
17/07/11 14:14:47 WARN ServerConnector:
17/07/11 14:15:32 WARN QueuedThreadPool:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.String.substring(String.java:1969)
17/07/11 14:15:32 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:179)
17/07/11 14:15:32 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver, [Lscala.Tuple2;@1e855db,BlockManagerId (driver, 192.168.175.160, 39913, None))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
17/07/11 14:15:32 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:179)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245)
17/07/11 14:15:32 WARN QueuedThreadPool: Unexpected thread death: org.spark_project.jetty.util.thread.QueuedThreadPool$3@710104 in SparkUI{STARTED,8<=8<=200,i=5,q=0}
17/07/11 14:15:32 INFO JDBCRDD: closed connection
17/07/11 14:15:32 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 22)
java.lang.OutOfMemoryError: GC overhead limit exceeded
17/07/11 14:15:32 INFO SparkUI: Stopped Spark web UI at http://192.168.175.160:4040
17/07/11 14:15:32 INFO DAGScheduler: Job 0 failed: collect at stack.scala:93, took 294.365864 s
Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808)
17/07/11 14:15:32 INFO DAGScheduler: ShuffleMapStage 1 (map at stack.scala:85) failed in 294.165 s due to Stage cancelled because SparkContext was shut down
17/07/11 14:15:32 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@cfb906)
17/07/11 14:15:32 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1499762732342,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down))
17/07/11 14:15:32 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker-1,5,main]
java.lang.OutOfMemoryError: GC overhead limit exceeded
17/07/11 14:15:32 INFO DiskBlockManager: Shutdown hook called
17/07/11 14:15:32 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/11 14:15:32 INFO ShutdownHookManager: Shutdown hook called
17/07/11 14:15:32 INFO MemoryStore: MemoryStore cleared
17/07/11 14:15:32 INFO BlockManager: BlockManager stopped
17/07/11 14:15:32 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/11 14:15:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b2ea8bd-95c0-45e4-a1cc-bd62b3899b24
17/07/11 14:15:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b2ea8bd-95c0-45e4-a1cc-bd62b3899b24/userFiles-194d73ba-fcfa-4616-ae17-78b0bba6b465
17/07/11 14:15:32 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Spark Configurations
I'm using 2g memory and 1 core for execution though its in development mode. I'm new to spark, sorry for such a naive question.
Thank you.!
First, you are initiating two
SparkSession
s which is quite useless and you are just splitting resources. So don't do that !Secondly, and here is where the problem is. There is a misunderstanding concerning the parallelism and the
jdbc
source with Apache Spark (don't worry, it's a gotcha ! ).It's mainly due to missing documentation. (The last time I have checked)
So back to the problem. What's actually happening is that following line :
is that it's delegating reads to a single worker.
So mainly, if you had enough memory and you succeeded in reading that data. The whole
destination
data will be in one partition. And one partition means troubles ! a.k.a possible :So what happened is that the single executor which has been chosen to fetch the data is overwhelmed and it's JVM blew up.
Let's solve this now :
(Disclaimer : the following code is an excerpt from spark-gotchas and I'm one of it's authors.)
So let's create some sample data and save them in our database :
Apache Spark provides two methods which be used for distributed data loading over JDBC. The first one partitions data using an integer column:
Partition column and bounds can provided using options as well:
Another option would be to use a sequence of predicates but I won't be talking about it here.
You can read more about Spark SQL and the JDBC Source here along with some other gotchas.
I hope this helps.