I am reading from Kafka queue using Spark Structured Streaming. After reading from Kafka I am applying filter
on the dataframe
. I am saving this filtered dataframe into a parquet file. This is generating many empty parquet files. Is there any way I can stop writing an empty file?
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....)
query = filterDF .writeStream \
.option("path", outputpath) \
.option("checkpointLocation", RawXMLCheckpoint) \
.start()
If you are using yarn client mode, then setting the num of executor cores to 1 will solve the problem. This means that only 1 task will be run at any time per executor.
Yes, but you would rather not do it.
The reason for many empty parquet files is that Spark SQL (the underlying infrastructure for Structured Streaming) tries to guess the number of partitions to load a dataset (with records from Kafka per batch) and does this "poorly", i.e. many partitions have no data.
When you save a partition with no data you will get an empty file.
You can use
repartition
orcoalesce
operators to set the proper number of partitions and reduce (or even completely avoid) empty files. See Dataset API.Why would you not do it?
repartition
andcoalesce
may incur performance degradation due to the extra step of shuffling the data between partitions (and possibly nodes in your Spark cluster). That can be expensive and not worth doing it (and hence I said that you would rather not do it).You may then be asking yourself, how to know the right number of partitions? And that's a very good question in any Spark project. The answer is fairly simple (and obvious if you understand what and how Spark does the processing): "Know your data" so you can calculate how many is exactly right.
you can try with repartitionByRange(column)..
I used this while writing dataframe to HDFS .. It solved my empty file creation issue.
I recommend using
repartition(partitioningColumns)
on the Dataframe resp. Dataset and after thatpartitionBy(partitioningColumns)
on thewriteStream
operation to avoid writing empty files.Reason: The bottleneck if you have a lot of data is often the read performance with Spark if you have a lot of small (or even empty) files and no partitioning. So you should definitely make use of the file/directory partitioning (which is not the same as RDD partitioning). This is especially a problem when using AWS S3. The partitionColumns should fit your common queries when reading the data like timestamp/day, message type/Kafka topic, ...
See also the
partitionBy
documentation on http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter