I'm a newbie in the Spark world and struggling with some concepts.
How does parallelism happen when using Spark Structured Streaming sourcing from Kafka ?
Let's consider the following snippet code:
SparkSession spark = SparkSession
.builder()
.appName("myApp")
.getOrCreate();
Dataset<VideoEventData> ds = spark
.readStream()
.format("kafka")
...
gDataset = ds.groupByKey(...)
pDataset = gDataset.mapGroupsWithState(
...
/* process each key - values */
loop values
if value is valid - save key/value result in the HDFS
...
)
StreamingQuery query = pDataset.writeStream()
.outputMode("update")
.format("console")
.start();
//await
query.awaitTermination();
I've read that the parallelism is related with the number of data partitions, and the number of partitions for a Dataset is based on the spark.sql.shuffle.partitions
parameter.
For every batch (pull from the Kafka), will the pulled items be divided among the number of
spark.sql.shuffle.partitions
? For example,spark.sql.shuffle.partitions=5
andBatch1=100
rows, will we end up with 5 partitions with 20 rows each ?Considering the snippet code provided, do we still leverage in the Spark parallelism due to the
groupByKey
followed by amapGroups/mapGroupsWithState
functions ?
UPDATE:
Inside the gDataset.mapGroupsWithState
is where I process each key/values and store the result in the HDFS. So, the output sink is being used only to output some stats in the console.
They will be divided once they reach
groupByKey
which is a shuffle boundary. When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partitionsGenerally yes, but it also depends on how you setup your Kafka topic. Although not visible to you from the code, Spark will internally split the data different stage into smaller tasks and distribute them among the available executors in the cluster. If your Kafka topic has only 1 partition, that means that prior to
groupByKey
, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by thespark.sql.shuffle.partitions
.