I have written Kafka stream program using Scala and executing in Spark standalone cluster. Code works fine in my local. I have done Kafka , Cassandra and Spark setup in Azure VM. I have opened all inbound and outbound ports to avoid port blocking.
started Master
sbin>./start-master.sh
Started Slave
sbin# ./start-slave.sh spark://vm-hostname:7077
I have verified this status in Master WEB UI.
Submit Job
bin#./spark-submit --class x.y.StreamJob --master spark://vm-hostname:7077 /home/user/appl.jar
I noticed that Application added and displayed in Master WEB UI.
I have published few messages to topic and messages are not received and persisted to Cassandra DB.
I clicked the Application name on master web console and noticed that Streaming tab is not available in that application console page.
Why application is not working in VM and working good in local ?
How to debug the issue in VM ?
def main(args: Array[String]): Unit = {
val spark = SparkHelper.getOrCreateSparkSession()
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
spark.sparkContext.setLogLevel("WARN")
val kafkaStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" ->
"vmip:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "loc",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("hello")
val numPartitionsOfInputTopic = 3
val streams = (1 to numPartitionsOfInputTopic) map {
_ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )
}
streams
}
kafkaStream.foreach(rdd=> {
rdd.foreachRDD(conRec=> {
val offsetRanges = conRec.asInstanceOf[HasOffsetRanges].offsetRanges
conRec.foreach(str=> {
try {
println(str.value().trim)
CassandraHelper.saveItemEvent(str.value().trim)
}catch {
case ex: Exception => {
println(ex.getMessage)
}
}
})
rdd.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
println("Read Msg")
})
println(" Spark parallel reader is ready !!!")
ssc.start()
ssc.awaitTermination()
}
def getSparkConf(): SparkConf = {
val conf = new SparkConf(true)
.setAppName("TestAppl")
.set("spark.cassandra.connection.host", "vmip")
.set("spark.streaming.stopGracefullyOnShutdown","true")
.setMaster("spark://vm-hostname:7077")
conf
}
Version
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion %"provided",
"org.apache.spark" %% "spark-sql" % sparkVersion %"provided",
"org.apache.spark" %% "spark-hive" % sparkVersion %"provided",
"com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion ,
"org.apache.kafka" %% "kafka" % "0.10.1.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion %"provided",
)
mergeStrategy in assembly := {
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x => (mergeStrategy in assembly).value(x)
}
To debug your issue, the first think would be to make sure that messages go through Kafka. To do so you need to have port 9092 open on your VM and try consuming directly from Kafka
from-beginning option will consume everything up to the max retention time you configured on your Kafka topic.
Check as well that you don't have 2 versions of Spark in your VM, and that you need to use "spark2-submit" to submit a Spark2 Job.