I have written a bit complex sparkR script and run it using spark-submit. What script basically do is read a big hive/impala parquet based table row by row and generate new parquet file having same number of rows. But it seems the job stops after exactly around 100 Minutes which seems some timeout.
- For up to 500K rows script works perfectly (Because it needs less than 100 Minutes)
- For 1, 2 or 3 or more million rows script exits after 100 Minutes.
I checked all possible parameter having values 100 Minutes range I know and tested. But could not find any solution.
[user@localhost R]$ time spark-submit sparkr-pre.R
Loading required package: methods
Attaching package: ‘SparkR’
The following objects are masked from ‘package:stats’:
filter, na.omit
The following objects are masked from ‘package:base’:
intersect, rbind, sample, subset, summary, table, transform
15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 1:========================================> (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
Execution halted
15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed
real 100m30.944s
user 1m26.326s
sys 0m19.459s
Environment Runtime Information
Name Value
Java Home /usr/java/jdk1.8.0_40/jre
Java Version 1.8.0_40 (Oracle Corporation)
Scala Version version 2.10.4
Spark Properties
Name Value
spark.akka.frameSize 1024
spark.app.id application_1451466100034_0019
spark.app.name SparkR
spark.driver.appUIAddress http://x.x.x.x:4040
spark.driver.host x.x.x.x
spark.driver.maxResultSize 8G
spark.driver.memory 100G
spark.driver.port 60471
spark.executor.id driver
spark.executor.memory 14G
spark.executorEnv.LD_LIBRARY_PATH $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
spark.fileserver.uri http://x.x.x.x:50281
spark.home /datas/spark-1.5.2-bin-hadoop2.6
spark.kryoserializer.buffer.max 2000M
spark.master yarn-client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS CDHPR1.dc.dialog.lk
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
spark.scheduler.mode FIFO
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.binaryAsString true
spark.submit.deployMode client
spark.ui.filters org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.yarn.dist.archives file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
spark.yarn.dist.files file:/home/inuser/R/sparkr-pre.R
System Properties
Name Value
SPARK_SUBMIT true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding UTF-8
file.encoding.pkg sun.io
file.separator /
java.awt.graphicsenv sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version 52.0
java.endorsed.dirs /usr/java/jdk1.8.0_40/jre/lib/endorsed
java.ext.dirs /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
java.home /usr/java/jdk1.8.0_40/jre
java.io.tmpdir /tmp
java.library.path :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name Java(TM) SE Runtime Environment
java.runtime.version 1.8.0_40-b26
java.specification.name Java Platform API Specification
java.specification.vendor Oracle Corporation
java.specification.version 1.8
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version 1.8.0_40
java.vm.info mixed mode
java.vm.name Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name Java Virtual Machine Specification
java.vm.specification.vendor Oracle Corporation
java.vm.specification.version 1.8
java.vm.vendor Oracle Corporation
java.vm.version 25.40-b25
line.separator
os.arch amd64
os.name Linux
os.version 2.6.32-431.el6.x86_64
path.separator :
sun.arch.data.model 64
sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
sun.boot.library.path /usr/java/jdk1.8.0_40/jre/lib/amd64
sun.cpu.endian little
sun.cpu.isalist
sun.io.unicode.encoding UnicodeLittle
sun.java.command org.apache.spark.deploy.SparkSubmit sparkr-pre.R
sun.java.launcher SUN_STANDARD
sun.jnu.encoding UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel
sun.os.patch.level unknown
user.country US
user.dir /home/user/R
user.home /home/user
user.language en
user.name inuser
user.timezone Asia/Colombo
Classpath Entries
Resource Source
/datas/spark-1.5.2-bin-hadoop2.6/conf/ System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/ System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar System Classpath
spark-default.conf
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#
spark.master yarn-client
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 100G
spark.executor.memory 14G
spark.sql.parquet.binaryAsString true
spark.kryoserializer.buffer.max 2000M
spark.driver.maxResultSize 8G
spark.akka.frameSize 1024
#spark.executor.instances 16
I cannot share the sparkR script in public. Really sorry about that. But the code work perfectly when it needs less than 100 Minutes to complete.
Since Spark 2.1, there is an environment variable called
SPARKR_BACKEND_CONNECTION_TIMEOUT
that controls the timeout. However, the default is still set to 100 minutes. So you need to set e.g.SPARKR_BACKEND_CONNECTION_TIMEOUT=1209600
on the driver so that you could run longer tasks.I would have thought that setting
--conf spark.yarn.appMasterEnv.SPARKR_BACKEND_CONNECTION_TIMEOUT=1209600
inspark-submit
would do the trick, but apparently that doesn't seem to set the variable properly either. So my current workaround is to include this in the R script that is being executed:It is a known bug in Spark 1.6.0, see: https://issues.apache.org/jira/browse/SPARK-12609. A quick review of SparkR code also indicated that the bug is actually present since Spark 1.4.0.
Until they release a fix, the quick and dirty solution is to increase the timeout. As noted in the issue, the problematic function is
connectBackend
The function can be patched at runtime usingassignInNamespace
.The following retrieves the original function, then wraps it in a second function for which we increase the timeout value to 48 hours. The original function is then replaced by the wrapper.
Put this code after loading the SparkR package.
The other solution would be to modify the timeout in SparkR code and recompile. For compilation instructions, see : https://github.com/apache/spark/blob/branch-1.6/R/install-dev.sh