I have a Spark streaming / DStream app like this:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
Where my context uses a configuration file where I can pull items with methods like appConf.getString
. So I actually use:
val context = StreamingContext.getOrCreate(
() => createStreamContext(sparkConf, appConf))
where val sparkConf = new SparkConf()...
If I stop my app and change configuration in the app file, these changes are not picked up unless I delete the checkpoint directory contents. For example, I would like to change spark.streaming.kafka.maxRatePerPartition
or spark.windowDurationSecs
dynamically. (EDIT: I kill the app, change the configuration file and then restart the app.) How can I do dynamically change these settings or enforce a (EDITED WORD) configuration change without trashing my checkpoint directory (which is about to include checkpoints for state info)?
How can I do dynamically change these settings or enforce a configuration change without trashing my checkpoint directory?
If dive into the code for StreamingContext.getOrCreate
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
You can see that if CheckpointReader
has checkpointed data in the class path, it uses new SparkConf()
as a parameter, as the overload doesn't allow for passing of a custom created SparkConf
. By default, SparkConf
will load any settings declared either as an environment variable or passed to the classpath:
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
So one way of achieving what you want is instead of creating a SparkConf
object in the code, you can pass the parameters via spark.driver.extraClassPath
and spark.executor.extraClassPath
to spark-submit
Do you create your Streaming Context the way the docs suggest, by using StreamingContext.getOrCreate
, which takes a previous checkpointDirectory
as an argument?
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
It can not be done adding/updating spark configurations when you are restoring from checkpoint directory. You can find spark checkpointing behaviour in documentation:
When the program is being started for the first time, it will create a new StreamingContext, set up all the streams and then call start().
When the program is being restarted after failure, it will re-create a StreamingContext from the checkpoint data in the checkpoint directory
So if you use checkpoint directory then on restart of job it will re-create a StreamingContext from checkpoint data which will have old sparkConf.