I have added the sbt packages fro kafka and spark streaming as follow:
"org.apache.spark" % "spark-streaming_2.10" % "1.6.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"
however when I wanna use the kafkadirect streams..I cant access it..
val topics="CCN_TOPIC,GGSN_TOPIC"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers)
val messages= org.apache.spark.streaming.kafka.KafkaUtils[String, String, kafka.serializer.StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
The compiler doesn't recognize kafka.serializer.StringDecoder..
object serializer is not a member of package org.apache.spark.streaming.kafka
EDIT: I tried also
import _root_.kafka.serializer
..but there is no StringDecoder in that..
Apologies if I use the wrong terminology, I am not an expert in dependencies, linking etc. Below method works using HDP 2.4.3.
1. Locating the correct Jar
You need to locate the correct Kafka jars on your environment.
The following shell script is useful for creating a general list of all classes in all Jars and dumping them to a file (which can then be searched with egrep when you need to find Jars for specific classes).
I am using HDP so I specified the root directory of the HDP install as the point to search for jars. Your platform jars might be elsewhere.
Once you have the list, you can search the file as follows:
On the Kafka broker node, I found the following Jar which contains the StringDecoder class:
Note that this is a different file to kafka_2.11-0.10.0.0.jar as suggested in another answer, which did not contain the Decoder class (at least, in my environment).
Also check your scala version for 2.10 or 2.11 and make sure the Jar aligns.
2. Adding the Jar to your Spark classpath
Include the Jar in your Spark classpath.
Method depends on if you want to update the classpath for a single session with spark-shell or spark-submit only, or if you want to update the classpath for all Spark sessions (eg using Ambari).
For a session of spark-shell:
Note that the spark-streaming-kafka maven dependency mentioned in the Spark documentation can causes an conflict during the import step, as explained later.
You can still go ahead and add it using the --packages option if you need it.
Example (Spark 1.6.2 and scala 2.10, yours may differ):
3. Importing into your session
You may get the following error:
There is another kafka package included by the maven package in my above example, and therefore it was imported as part of "org.apache.spark.streaming._"
To resolve, do as follows:
Please try this: add the file
to your project dependency. It should fix your error.
One potential reason to cause this issue is you import
import org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream
beforeimport kafka.serializer.StringDecoder
.The correct order is
import kafka.serializer.StringDecoder //first import org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream