I have a spark-streaming app which looks like this:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
And, I run it on a yarn cluster using
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
When I try to log kafkaDF.rdd.partitions.size
, the result turns out be '1' or '5' mostly. I am confused, is it possible to control the number of partitions of my DataFrame? KafkaUtils.createStream
doesn't seem to accept any parameters related to the number of partitions I want for the rdd. I tried kafkaDF.rdd.repartition( int )
, but it doesn't seem to work either.
How can I achieve more parallelism in my code? If my approach is wrong, what is the correct way to achieve it?
In Spark Streaming, parallelism can be achieved in two areas: (a) the consumers/receivers (in your case the Kafka consumers), and (b) the processing (done by Spark).
By default, spark streaming will assign one core (aka Thread) to each consumer. So if you need more data to be ingested you need to create more consumers. Each consumer will create a DStream. You can then union the DStreams to get one large stream.
Alternatively, the number of receivers/consumers can be increased by repartitioning the input stream:
All the remaining cores available to the streaming app will be assigned to Spark.
So if you have
N
cores (defined via thespark.cores.max
) and you haveC
consumers you are left withN-C
cores available for Spark.block interval = how long a consumer waits before it pushes the data it created as a spark block (defined as configuration
spark.streaming.blockInterval
).Always keep in mind that Spark Streaming has two functions that constantly take place. A set of threads that read the current micro-batch (consumers), and a set of threads that process the previous micro-batch (Spark).
For more performance tuning tips please refer to here, here and here.