I was trying to run a sample spark streaming code. but I get this error:
16/06/02 15:25:42 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.streams.spark_consumer.SparkConsumer.main(SparkConsumer.java:56)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.streams.spark_consumer.SparkConsumer.main(SparkConsumer.java:56)
My code is given below. I know there are a few unused imports, because I was doing something else and getting the same error so I modified the same code to run the sample program given on the spark streaming website:
package com.streams.spark_consumer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
import org.apache.spark.api.java.JavaSparkContext;
public class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
System.out.println("Han chal raha hai"); //just to know if this part of the code is executed
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
System.out.println("Han bola na chal raha hau chutiye 1"); //just to know if this part of the code is executed
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
jssc.start();
jssc.awaitTermination();
}
}
Can anybody help me out with this? I am using local master, even then I have tried starting a master and stoping a master (also slaves), I didn't know why that might help but just in case, I have already tried that.