saveAsTextFile hangs in spark java.io.IOException:

2019-07-22 11:50发布

问题:

I am running an application in spark which do the simple diff between two data frame . I execute as jar file in my cluster environment . My cluster environment is 94 node cluster. There are two data set 2 GB and 4 GB which mapped to data frame .

My job is working fine for the very small size files ...

I personal think saveAsTextFile takes more time in my application Below my cluster connfig details

Total Vmem allocated for Containers     394.80 GB
Total Vmem allocated for Containers     394.80 GB
Total VCores allocated for Containers   36  

This is how i run my spark job

spark-submit --queue root.queue --deploy-mode client --master yarn SparkApplication-SQL-jar-with-dependencies.jar

Here is my code .

object TestDiff {

   def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WordCount"); 

    conf.set("spark.executor.memory", "32g")
    conf.set("spark.driver.memory", "32g")
    conf.set("spark.driver.maxResultSize", "4g")

    val sc = new SparkContext(conf); //Creating spark context
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{ StructType, StructField, StringType, DoubleType, IntegerType }
    import org.apache.spark.sql.functions.udf

     val schema = StructType(Array(
      StructField("filler1", StringType),
      StructField("dunsnumber", StringType),
      StructField("transactionalindicator", StringType)))

    import org.apache.spark.sql.functions._

    val textRdd1 = sc.textFile("/home/cloudera/TRF/PCFP/INCR")

    val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|", -1)))
    var df1 = sqlContext.createDataFrame(rowRdd1, schema)

    val textRdd2 = sc.textFile("/home/cloudera/TRF/PCFP/MAIN")

    val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|", -1)))
    var df2 = sqlContext.createDataFrame(rowRdd2, schema)

    //Finding the diff between two if any of the columns has changed 
    val diffAnyColumnDF = df1.except(df2)
    diffAnyColumnDF.rdd.coalesce(1).saveAsTextFile("Diffoutput") 
}
}

It takes more than 30 minutes and then it fails .

with below exception

Here is the logs

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
17/09/15 11:55:01 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)
17/09/15 11:56:19 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@7fe57079,BlockManagerId(1, c755kds.int.int.com, 33507))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:491)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:520)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1818)
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:520)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
17/09/15 11:56:19 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)

Please suggest how to tune my spark job ?

I just changed executor memory and it job got succeeded but it is very very slow .

conf.set("spark.executor.memory", "64g")

But job is very slow ...It takes around 15 minutes to complete ..

And job has taken 15 minutes to complete .

Attaching DAG Visualization

After increasing the time out conf getting below error ..

 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175200 ms

回答1:

I think your single partitiond file size is big. it takes long time to stream the data over the TCP channel and the connection can not be made alive for a long time and gets reset.

Can you coalesce to a higher number of partitions?