Understanding Spark Structured Streaming Paralleli

2019-06-21 20:25发布

I'm a newbie in the Spark world and struggling with some concepts.

How does parallelism happen when using Spark Structured Streaming sourcing from Kafka ?

Let's consider the following snippet code:

SparkSession spark = SparkSession
          .builder()
          .appName("myApp")
          .getOrCreate();   

Dataset<VideoEventData> ds = spark
  .readStream()
  .format("kafka")
  ...

gDataset = ds.groupByKey(...)

pDataset = gDataset.mapGroupsWithState(
      ...
      /* process each key - values */
      loop values
        if value is valid - save key/value result in the HDFS
      ... 
)

StreamingQuery query = pDataset.writeStream()
          .outputMode("update")
          .format("console")
          .start();

//await
query.awaitTermination();

I've read that the parallelism is related with the number of data partitions, and the number of partitions for a Dataset is based on the spark.sql.shuffle.partitions parameter.

  1. For every batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions? For example, spark.sql.shuffle.partitions=5 and Batch1=100 rows, will we end up with 5 partitions with 20 rows each ?

  2. Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions ?

UPDATE:

Inside the gDataset.mapGroupsWithState is where I process each key/values and store the result in the HDFS. So, the output sink is being used only to output some stats in the console.

1条回答
来,给爷笑一个
2楼-- · 2019-06-21 20:44

For every Batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions?

They will be divided once they reach groupByKey which is a shuffle boundary. When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partitions

Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions

Generally yes, but it also depends on how you setup your Kafka topic. Although not visible to you from the code, Spark will internally split the data different stage into smaller tasks and distribute them among the available executors in the cluster. If your Kafka topic has only 1 partition, that means that prior to groupByKey, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by the spark.sql.shuffle.partitions.

查看更多
登录 后发表回答