Mid-Stream Changing Configuration with Check-Point

2020-07-22 18:10发布

问题:

I have a Spark streaming / DStream app like this:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

Where my context uses a configuration file where I can pull items with methods like appConf.getString. So I actually use:

val context = StreamingContext.getOrCreate(
    appConf.getString("spark.checkpointDirectory"), 
    () => createStreamContext(sparkConf, appConf))

where val sparkConf = new SparkConf()....

If I stop my app and change configuration in the app file, these changes are not picked up unless I delete the checkpoint directory contents. For example, I would like to change spark.streaming.kafka.maxRatePerPartition or spark.windowDurationSecs dynamically. (EDIT: I kill the app, change the configuration file and then restart the app.) How can I do dynamically change these settings or enforce a (EDITED WORD) configuration change without trashing my checkpoint directory (which is about to include checkpoints for state info)?

回答1:

How can I do dynamically change these settings or enforce a configuration change without trashing my checkpoint directory?

If dive into the code for StreamingContext.getOrCreate:

def getOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration = SparkHadoopUtil.get.conf,
    createOnError: Boolean = false
  ): StreamingContext = {
    val checkpointOption = CheckpointReader.read(
      checkpointPath, new SparkConf(), hadoopConf, createOnError)
    checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}

You can see that if CheckpointReader has checkpointed data in the class path, it uses new SparkConf() as a parameter, as the overload doesn't allow for passing of a custom created SparkConf. By default, SparkConf will load any settings declared either as an environment variable or passed to the classpath:

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

  import SparkConf._

  /** Create a SparkConf that loads defaults from system properties and the classpath */
  def this() = this(true)

So one way of achieving what you want is instead of creating a SparkConf object in the code, you can pass the parameters via spark.driver.extraClassPath and spark.executor.extraClassPath to spark-submit.



回答2:

Do you create your Streaming Context the way the docs suggest, by using StreamingContext.getOrCreate, which takes a previous checkpointDirectory as an argument?

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing



回答3:

It can not be done adding/updating spark configurations when you are restoring from checkpoint directory. You can find spark checkpointing behaviour in documentation:

When the program is being started for the first time, it will create a new StreamingContext, set up all the streams and then call start(). When the program is being restarted after failure, it will re-create a StreamingContext from the checkpoint data in the checkpoint directory

So if you use checkpoint directory then on restart of job it will re-create a StreamingContext from checkpoint data which will have old sparkConf.