slice function in dstream spark streaming not work

2019-09-12 07:34发布

问题:

Spark streaming providing sliding window function for get rdd for last k. But I want to try use slice function to get rdd for last k, in a case I want to query rdd during range time before current time.

delta = timedelta(seconds=30)
datates = datamap.slice(datetime.now()-delta,datetime.now())

And I get this error when execute the code

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/home/hduser/spark-1.5.0/<ipython-input-1364-f8d325e33d4c> in <module>()
----> 1 datates = datamap.slice(datetime.now()-delta,datetime.now())

/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in slice(self, begin, end)
    411         `begin`, `end` could be datetime.datetime() or unix_timestamp
    412         """
--> 413         jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
    414         return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
    415

/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in _jdstream(self)
    629
    630         jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer)
--> 631         dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
    632         self._jdstream_val = dstream.asJavaDStream()
    633         return self._jdstream_val

/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    699         answer = self._gateway_client.send_command(command)
    700         return_value = get_return_value(answer, self._gateway_client, None,
--> 701                 self._fqn)
    702
    703         for temp_arg in temp_args:

/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.python.PythonTransformedDStream.
: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported
        at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
        at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
        at org.apache.spark.streaming.api.python.PythonDStream.<init>(PythonDStream.scala:172)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.<init>(PythonDStream.scala:189)
        at sun.reflect.GeneratedConstructorAccessor80.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:214)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

How to solve this error? Thank you

回答1:

Based on the error message,

"Adding new inputs, transformations, and output operations after stopping a context is not supported"

it looks like ssc.stop() instead of ssc.awaitTermination() was used. Please provide more information about the Spark Streaming Context (ssc) setup in the program.