Can't acess kafka.serializer.StringDecoder

2019-05-30 04:55发布

I have added the sbt packages fro kafka and spark streaming as follow:

"org.apache.spark" % "spark-streaming_2.10" % "1.6.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1"

however when I wanna use the kafkadirect streams..I cant access it..

val topics="CCN_TOPIC,GGSN_TOPIC"
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBrokers)

val messages= org.apache.spark.streaming.kafka.KafkaUtils[String, String, kafka.serializer.StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

The compiler doesn't recognize kafka.serializer.StringDecoder..

 object serializer is not a member of package org.apache.spark.streaming.kafka

EDIT: I tried also

 import  _root_.kafka.serializer

..but there is no StringDecoder in that..

3条回答
Luminary・发光体
2楼-- · 2019-05-30 05:34

Apologies if I use the wrong terminology, I am not an expert in dependencies, linking etc. Below method works using HDP 2.4.3.

1. Locating the correct Jar

You need to locate the correct Kafka jars on your environment.

The following shell script is useful for creating a general list of all classes in all Jars and dumping them to a file (which can then be searched with egrep when you need to find Jars for specific classes).

I am using HDP so I specified the root directory of the HDP install as the point to search for jars. Your platform jars might be elsewhere.

all_hdp_classes () {
  find -L /usr/hdp/current -maxdepth 20 -name "*.jar" -print | while read line; do
    for i in `jar tf $line | grep .class`
    do
    echo $line : $i
    done 
  done
}
all_hdp_classes > ~/all_hdp_classes

Once you have the list, you can search the file as follows:

egrep 'kafka' ~/all_hdp_classes | grep Decoder

NB: Ambari only installs the Kafka broker libraries on the node which the Kafka broker is on. So you need to search on that node or you won't find anything (or you will only find the spark-examples). Then copy the Jar onto the node/s which you are running Spark on.

On the Kafka broker node, I found the following Jar which contains the StringDecoder class:

/usr/hdp/current/kafka-broker/libs/kafka_2.10-0.10.1.2.1.2.0-10.jar

Note that this is a different file to kafka_2.11-0.10.0.0.jar as suggested in another answer, which did not contain the Decoder class (at least, in my environment).

Also check your scala version for 2.10 or 2.11 and make sure the Jar aligns.

2. Adding the Jar to your Spark classpath

Include the Jar in your Spark classpath.

Method depends on if you want to update the classpath for a single session with spark-shell or spark-submit only, or if you want to update the classpath for all Spark sessions (eg using Ambari).

For a session of spark-shell:

spark-shell --jars /path/to/jar/on/this/node/kafka_2.10-0.10.1.2.1.2.0-10.jar

Note that the spark-streaming-kafka maven dependency mentioned in the Spark documentation can causes an conflict during the import step, as explained later.

You can still go ahead and add it using the --packages option if you need it.

Example (Spark 1.6.2 and scala 2.10, yours may differ):

spark-shell --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --jars kafka_2.10-0.10.1.2.1.2.0-10.jar

3. Importing into your session

import org.apache.spark.streaming._
import kafka.serializer.StringDecoder

You may get the following error:

error: object serializer is not a member of package org.apache.spark.streaming.kafka

There is another kafka package included by the maven package in my above example, and therefore it was imported as part of "org.apache.spark.streaming._"

To resolve, do as follows:

import org.apache.spark.streaming._
import _root_.kafka.serializer.StringDecoder
查看更多
smile是对你的礼貌
3楼-- · 2019-05-30 05:37

Please try this: add the file

kafka_2.11-0.10.0.0.jar

to your project dependency. It should fix your error.

查看更多
叼着烟拽天下
4楼-- · 2019-05-30 05:41

One potential reason to cause this issue is you import import org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream before import kafka.serializer.StringDecoder .

The correct order is import kafka.serializer.StringDecoder //first import org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream

查看更多
登录 后发表回答