Spark Will Not Load Large MySql Table: Java Commun

2019-08-10 20:17发布

I'm trying to get a pretty large table from mysql so I can manipulate using spark/databricks. I can't get it to load into spark - I have tried taking smaller subsets, but even at the smallest reasonable unit, it still fails to load.

I have tried playing with the wait_timeout and interactive_timeout in mysql, but it doesn't seem to make any difference

I am also loading a smaller (different) table, and that loads just fine.

df_dataset = get_jdbc('raw_data_load', predicates=predicates).select('field1','field2', 'field3','date')

df_dataset = df_dataset.repartition('date')                                                                                      
df_dataset.registerTempTable('raw_data')

I then am trying to cache the data for sql purposes using

%sql

cache table raw_data;

And it goes and chugs for a while and his the database, but always times out after 30-40 minutes and I get the error below

Up until the point it times out, I see

Error in SQL statement: SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 17075, 10.200.240.63, executor 1): com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 1,715,280 milliseconds ago.  The last packet sent successfully to the server was 1,715,290 milliseconds ago.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1121)
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2290)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:2046)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3554)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:491)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3245)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2413)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2836)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2825)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2156)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2323)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: Can not read response from server. Expected to read 10 bytes, read 4 bytes before connection was unexpectedly lost.
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3166)
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2271)
    ... 42 more

Driver stacktrace:
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 4 times, most recent failure: Lost task 0.3 in stage 30.0 (TID 17075, 10.200.240.63, executor 1): com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 1,715,280 milliseconds ago.  The last packet sent successfully to the server was 1,715,290 milliseconds ago.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1121)
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2290)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:2046)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3554)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:491)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3245)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2413)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2836)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2825)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2156)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2323)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: Can not read response from server. Expected to read 10 bytes, read 4 bytes before connection was unexpectedly lost.
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3166)
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2271)
    ... 42 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1442)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1430)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1429)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1429)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1657)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1612)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1937)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1950)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1963)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1977)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2409)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2408)
    at org.apache.spark.sql.Dataset$$anonfun$60.apply(Dataset.scala:2791)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87)
    at org.apache.spark.sql.execution.SQLExecution$.withFileAccessAudit(SQLExecution.scala:53)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2790)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2408)
    at org.apache.spark.sql.execution.command.CacheTableCommand.run(cache.scala:45)
    at com.databricks.sql.acl.TrustedRunnableCommand$$anonfun$run$1.apply(TrustedRunnableCommand.scala:29)
    at com.databricks.sql.acl.TrustedRunnableCommand$$anonfun$run$1.apply(TrustedRunnableCommand.scala:29)
    at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:460)
    at com.databricks.sql.acl.TrustedRunnableCommand.run(TrustedRunnableCommand.scala:29)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:599)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:698)
    at com.databricks.backend.daemon.driver.SQLDriverLocal$$anonfun$1.apply(SQLDriverLocal.scala:82)
    at com.databricks.backend.daemon.driver.SQLDriverLocal$$anonfun$1.apply(SQLDriverLocal.scala:28)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:28)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:128)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 1,715,280 milliseconds ago.  The last packet sent successfully to the server was 1,715,290 milliseconds ago.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1121)
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2290)
    at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:2046)
    at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3554)
    at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:491)
    at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3245)
    at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2413)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2836)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2825)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2156)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2323)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:301)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Caused by: java.io.EOFException: Can not read response from server. Expected to read 10 bytes, read 4 bytes before connection was unexpectedly lost.
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3166)
    at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2271)
    ... 42 more

    at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:116)
    at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:128)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:230)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$2.apply(DriverLocal.scala:211)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:173)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:168)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:39)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:206)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:39)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:211)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:589)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:584)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:488)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
    at java.lang.Thread.run(Thread.java:745)

1条回答
再贱就再见
2楼-- · 2019-08-10 20:47

With extremely large tables you're going to want to partition the query across your executors. By default the JDBC reader will read the query the parallelize it from the driver. If you have an incrementing, sequential key in the table you can parallelize using the lowerBound, upperBound, and numPartitions parameters. Here's an example taken from https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#python-example

df = spark.read.\
      jdbc(url=jdbcUrl, \
              table='employees',\
              column='emp_no',\
              lowerBound=1,\
              upperBound=100000, \
              numPartitions=100)
df.show()

Having said that, you may want to read and write the data out to Parquet as that will perform better than re-reading from JDBC again.

查看更多
登录 后发表回答