Programatically creating dstreams in apache spark

2019-09-14 15:03发布

问题:

I am writing some self contained integration tests around Apache Spark Streaming. I want to test that my code can ingest all kinds of edge cases in my simulated test data. When I was doing this with regular RDDs (not streaming). I could use my inline data and call "parallelize" on it to turn it into a spark RDD. However, I can find no such method for creating destreams. Ideally I would like to call some "push" function once in a while and have the tupple magically appear in my dstream. ATM I'm doing this by using Apache Kafka: I create a temp queue, and I write to it. But this seems like overkill. I'd much rather create the test-dstream directly from my test data without having to use Kafka as a mediator.

回答1:

For testing purpose, you can create an input stream from a queue of RDDs. Pushing more RDDs in the queue will simulate having processed more events in the batch interval.

val sc = SparkContextHolder.sc
val ssc = new StreamingContext(sc, Seconds(1))
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue()
val inputStream: InputDStream[Int] = ssc.queueStream(inputData)

inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval
// etc

val result = inputStream.map(x => x*x)
result.foreachRDD(rdd => assertSomething(rdd))
ssc.start() // Don't forget to start the streaming context


回答2:

In addition to Raphael solution I think you like to also either can process one batch a time or everything available approach. You need to set oneAtATime flag accordingly on queustream's optional method argument as shown below:

val slideDuration = Milliseconds(100)
val conf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[8]")
val sparkSession: SparkSession = SparkSession.builder.config(conf).getOrCreate()
val sparkContext: SparkContext = sparkSession.sparkContext
val queueOfRDDs = mutable.Queue[RDD[String]]()


val streamingContext: StreamingContext = new StreamingContext(sparkContext, slideDuration)
val rddOneQueuesAtATimeDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = true)
val rddFloodOfQueuesDS: DStream[String] = streamingContext.queueStream(queueOfRDDs, oneAtATime = false)

rddOneQueuesAtATimeDS.print(120)
rddFloodOfQueuesDS.print(120)

streamingContext.start()


for (i <- (1 to 10)) {
  queueOfRDDs += sparkContext.makeRDD(simplePurchase(i))
  queueOfRDDs += sparkContext.makeRDD(simplePurchase((i + 3) * (i + 3)))
  Thread.sleep(slideDuration.milliseconds)
}

Thread.sleep(1000L)


回答3:

I found this base example: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala

The key here is calling the "store" command. Replace the contents of store with whatever you want.