I am getting an error on run time by running below java code,
Is there any dependencies need to include for logging like log4js or anything?
Why this error not appearing on compile time so that It is more easy..
Here is my Java code,
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
sparkConf.set("spark.streaming.concurrentJobs", "3");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(3000));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "x.xx.xxx.xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("topicName");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
stream.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
System.out.println("file data");
return new Tuple2<>(record.key(), record.value());
}
});
Dependencies using,
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version> </dependency> -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.restlet.jee</groupId>
<artifactId>org.restlet</artifactId>
<version>2.0.10</version>
</dependency>
Getting below errors,
Exception in thread "main" java.lang.NoClassDefFoundError: org.apache.spark.internal.Logging
at java.lang.ClassLoader.defineClassImpl(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:346)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:154)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:727)
at java.net.URLClassLoader.access$400(URLClassLoader.java:95)
at java.net.URLClassLoader$ClassFinder.run(URLClassLoader.java:1182)
at java.security.AccessController.doPrivileged(AccessController.java:686)
at java.net.URLClassLoader.findClass(URLClassLoader.java:602)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:846)
at java.lang.ClassLoader.loadClass(ClassLoader.java:825)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:805)
at org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe(ConsumerStrategy.scala)
at spark.KafkaConsumerDirectStream.main(KafkaConsumerDirectStream.java:45)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:607)
at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:846)
at java.lang.ClassLoader.loadClass(ClassLoader.java:825)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:325)
at java.lang.ClassLoader.loadClass(ClassLoader.java:805)
... 14 more
Problem got resolved by setting up mentioned above dependencies into below order,
import org.apache.spark.Logging, it is available in Spark version 1.5.2 or later, so I would recommend you to use 1.5.2 or a later version of spark.
Which spark version you are using?
There is another dependency that solves this problem, and is compatible with Spark 2.x.
For SBT, use this dependency:
"org.apache.bahir" %% "spark-streaming-twitter" % "2.0.0"