streaming.StreamingContext: Error starting the con

2019-09-10 17:14发布

问题:

I was trying to run a sample spark streaming code. but I get this error:

16/06/02 15:25:42 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.streams.spark_consumer.SparkConsumer.main(SparkConsumer.java:56)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.streams.spark_consumer.SparkConsumer.main(SparkConsumer.java:56)

My code is given below. I know there are a few unused imports, because I was doing something else and getting the same error so I modified the same code to run the sample program given on the spark streaming website:

package com.streams.spark_consumer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import scala.Tuple2;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
import org.apache.spark.api.java.JavaSparkContext;

public class SparkConsumer {

private static final Pattern SPACE = Pattern.compile(" ");

 public static void main(String[] args) throws Exception {

    System.out.println("Han chal raha hai");  //just to know if this part of the code is executed
    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

    System.out.println("Han bola na chal raha hau chutiye 1"); //just to know if this part of the code is executed

    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

    JavaDStream<String> words = lines.flatMap(
              new FlatMapFunction<String, String>() {
                 public Iterable<String> call(String x) {
                  return Arrays.asList(x.split(" "));
                }
              });

    JavaPairDStream<String, Integer> pairs = words.mapToPair(
              new PairFunction<String, String, Integer>() {
                 public Tuple2<String, Integer> call(String s) {
                  return new Tuple2<String, Integer>(s, 1);
                }
              });
            JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
              new Function2<Integer, Integer, Integer>() {
                 public Integer call(Integer i1, Integer i2) {
                  return i1 + i2;
                }
              });

    jssc.start();
    jssc.awaitTermination();
  }

  }

Can anybody help me out with this? I am using local master, even then I have tried starting a master and stoping a master (also slaves), I didn't know why that might help but just in case, I have already tried that.

回答1:

According to Spark documentation

Since the output operations actually allow the transformed data to be consumed by external systems, they trigger the actual execution of all the DStream transformations (similar to actions for RDDs).

So use any of the output operations after your tranformations.

print()
foreachRDD(func)
saveAsObjectFiles(prefix, [suffix])
saveAsTextFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])