Spark Streaming: How can I add more partitions to

2019-04-01 19:56发布

I have a spark-streaming app which looks like this:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)

    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

And, I run it on a yarn cluster using

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

When I try to log kafkaDF.rdd.partitions.size, the result turns out be '1' or '5' mostly. I am confused, is it possible to control the number of partitions of my DataFrame? KafkaUtils.createStream doesn't seem to accept any parameters related to the number of partitions I want for the rdd. I tried kafkaDF.rdd.repartition( int ), but it doesn't seem to work either.

How can I achieve more parallelism in my code? If my approach is wrong, what is the correct way to achieve it?

1条回答
forever°为你锁心
2楼-- · 2019-04-01 20:26

In Spark Streaming, parallelism can be achieved in two areas: (a) the consumers/receivers (in your case the Kafka consumers), and (b) the processing (done by Spark).

By default, spark streaming will assign one core (aka Thread) to each consumer. So if you need more data to be ingested you need to create more consumers. Each consumer will create a DStream. You can then union the DStreams to get one large stream.

// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B

val combineStream = messageStream1.union(messageStream2)

Alternatively, the number of receivers/consumers can be increased by repartitioning the input stream:

inputStream.repartition(<number of partitions>))

All the remaining cores available to the streaming app will be assigned to Spark.

So if you have N cores (defined via the spark.cores.max) and you have C consumers you are left with N-C cores available for Spark.

#Partitions =~  #Consumers x (batch duration / block interval)

block interval = how long a consumer waits before it pushes the data it created as a spark block (defined as configuration spark.streaming.blockInterval).

Always keep in mind that Spark Streaming has two functions that constantly take place. A set of threads that read the current micro-batch (consumers), and a set of threads that process the previous micro-batch (Spark).

For more performance tuning tips please refer to here, here and here.

查看更多
登录 后发表回答