Spark streaming is not working in Standalone clust

2019-06-17 09:29发布

问题:

I have written Kafka stream program using Scala and executing in Spark standalone cluster. Code works fine in my local. I have done Kafka , Cassandra and Spark setup in Azure VM. I have opened all inbound and outbound ports to avoid port blocking.

started Master

sbin>./start-master.sh

Started Slave

sbin# ./start-slave.sh spark://vm-hostname:7077

I have verified this status in Master WEB UI.

Submit Job

bin#./spark-submit --class x.y.StreamJob --master spark://vm-hostname:7077 /home/user/appl.jar

I noticed that Application added and displayed in Master WEB UI.

I have published few messages to topic and messages are not received and persisted to Cassandra DB.

I clicked the Application name on master web console and noticed that Streaming tab is not available in that application console page.

Why application is not working in VM and working good in local ?

How to debug the issue in VM ?

def main(args: Array[String]): Unit = {
    val spark = SparkHelper.getOrCreateSparkSession()
    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    spark.sparkContext.setLogLevel("WARN")
    val kafkaStream = {
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> 
                "vmip:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "loc",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("hello")
      val numPartitionsOfInputTopic = 3
      val streams = (1 to numPartitionsOfInputTopic) map {
        _ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )
      }
     streams
    }


    kafkaStream.foreach(rdd=> {
      rdd.foreachRDD(conRec=> {
        val offsetRanges = conRec.asInstanceOf[HasOffsetRanges].offsetRanges
        conRec.foreach(str=> {
          try {
            println(str.value().trim)
            CassandraHelper.saveItemEvent(str.value().trim)

          }catch {
            case ex: Exception => {
              println(ex.getMessage)
            }
          }
        })
        rdd.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })
      println("Read Msg")
    })
    println(" Spark parallel reader is ready !!!")
    ssc.start()
    ssc.awaitTermination()
  }

  def getSparkConf(): SparkConf = {
    val conf = new SparkConf(true)
      .setAppName("TestAppl")
      .set("spark.cassandra.connection.host", "vmip")
      .set("spark.streaming.stopGracefullyOnShutdown","true")
    .setMaster("spark://vm-hostname:7077")

    conf
  }

Version

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"


libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion %"provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion  %"provided",
  "org.apache.spark" %% "spark-hive" % sparkVersion %"provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.kafka" %% "kafka" % "0.10.1.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" %  sparkVersion  %"provided",
)
mergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x => (mergeStrategy in assembly).value(x)
}

回答1:

To debug your issue, the first think would be to make sure that messages go through Kafka. To do so you need to have port 9092 open on your VM and try consuming directly from Kafka

bin/kafka-console-consumer.sh --bootstrap-server vmip:9092 --topic hello --from-beginning

from-beginning option will consume everything up to the max retention time you configured on your Kafka topic.

Check as well that you don't have 2 versions of Spark in your VM, and that you need to use "spark2-submit" to submit a Spark2 Job.