why my spark job stuck in kafka streaming

2019-07-31 20:08发布

问题:

Output after spark job submitted to spark cluster in kubernetes cluster created by minicube:

----------------- RUNNING ----------------------
[Stage 0:>                                                          (0 + 0) / 2]17/06/16 16:08:15 INFO VerifiableProperties: Verifying properties
17/06/16 16:08:15 INFO VerifiableProperties: Property group.id is overridden to xxx
17/06/16 16:08:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
xxxxxxxxxxxxxxxxxxxxx
[Stage 0:>                                                          (0 + 0) / 2]

Information from spark web ui:

foreachRDD at myfile.scala:49 +details

org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:625) myfile.run(myfile.scala:49) Myjob$.main(Myjob.scala:100) Myjob.main(Myjob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

My codes:

  println("----------------- RUNNING ----------------------");
    eventsStream.foreachRDD { rdd =>
        println("xxxxxxxxxxxxxxxxxxxxx")
        //println(rdd.count());
    if( !rdd.isEmpty )
    {
      println("yyyyyyyyyyyyyyyyyyyyyyy")
        val df = sqlContext.read.json(rdd);
        df.registerTempTable("data");

        val rules = rulesSource.rules();
        var resultsRDD : RDD[(String,String,Long,Long,Long,Long,Long,Long)]= sc.emptyRDD;
        rules.foreach { rule =>
        ...
        }

        sqlContext.dropTempTable("data")
    }
    else
    {
        println("-------");
        println("NO DATA");
        println("-------");
    }
}

Any idea? Thanks

UPDATE

My spark job runs well in docker container of standalone spark. but if submitted to spark cluster in kubernetes cluster, it is stuck in kafka streaming. No idea why?

The yaml file for spark master is from https://github.com/phatak-dev/kubernetes-spark/blob/master/spark-master.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    name: spark-master
  name: spark-master
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: spark-master
    spec:
      containers:
      - name : spark-master
        image: spark-2.1.0-bin-hadoop2.6 
        imagePullPolicy: "IfNotPresent"
        name: spark-master
        ports:
        - containerPort: 7077
          protocol: TCP
        command:
         - "/bin/bash"
         - "-c"
         - "--"
        args :
- './start-master.sh ; sleep infinity'

回答1:

Logs will be helpful to diagnose the issue.

essentially you can't create another RDD with in the RDD operation. i.e. rdd1.map{rdd2.count()} is not valid

See how the RDD is converted to dataframe after the implicit sqlContext import.

        import sqlContext.implicits._
        eventsStream.foreachRDD { rdd =>

            println("yyyyyyyyyyyyyyyyyyyyyyy")

            val df = rdd.toDF(); 
            df.registerTempTable("data");
            .... //Your logic here.
            sqlContext.dropTempTable("data")
        }