I have a Spark streaming / DStream app like this:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
Where my context uses a configuration file where I can pull items with methods like appConf.getString
. So I actually use:
val context = StreamingContext.getOrCreate(
appConf.getString("spark.checkpointDirectory"),
() => createStreamContext(sparkConf, appConf))
where val sparkConf = new SparkConf()...
.
If I stop my app and change configuration in the app file, these changes are not picked up unless I delete the checkpoint directory contents. For example, I would like to change spark.streaming.kafka.maxRatePerPartition
or spark.windowDurationSecs
dynamically. (EDIT: I kill the app, change the configuration file and then restart the app.) How can I do dynamically change these settings or enforce a (EDITED WORD) configuration change without trashing my checkpoint directory (which is about to include checkpoints for state info)?
Do you create your Streaming Context the way the docs suggest, by using
StreamingContext.getOrCreate
, which takes a previouscheckpointDirectory
as an argument?http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
If dive into the code for
StreamingContext.getOrCreate
:You can see that if
CheckpointReader
has checkpointed data in the class path, it usesnew SparkConf()
as a parameter, as the overload doesn't allow for passing of a custom createdSparkConf
. By default,SparkConf
will load any settings declared either as an environment variable or passed to the classpath:So one way of achieving what you want is instead of creating a
SparkConf
object in the code, you can pass the parameters viaspark.driver.extraClassPath
andspark.executor.extraClassPath
tospark-submit
.It can not be done adding/updating spark configurations when you are restoring from checkpoint directory. You can find spark checkpointing behaviour in documentation:
So if you use checkpoint directory then on restart of job it will re-create a StreamingContext from checkpoint data which will have old sparkConf.