How to avoid empty files while writing parquet fil

2020-04-28 11:26发布

I am reading from Kafka queue using Spark Structured Streaming. After reading from Kafka I am applying filter on the dataframe. I am saving this filtered dataframe into a parquet file. This is generating many empty parquet files. Is there any way I can stop writing an empty file?

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KafkaServer) \
    .option("subscribe", KafkaTopics) \
    .load()

Transaction_DF = df.selectExpr("CAST(value AS STRING)")

decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....) 

query = filterDF .writeStream \
    .option("path", outputpath) \
    .option("checkpointLocation", RawXMLCheckpoint) \
    .start()

4条回答
欢心
2楼-- · 2020-04-28 11:43

If you are using yarn client mode, then setting the num of executor cores to 1 will solve the problem. This means that only 1 task will be run at any time per executor.

查看更多
乱世女痞
3楼-- · 2020-04-28 11:48

Is there any way I can stop writing an empty file.

Yes, but you would rather not do it.

The reason for many empty parquet files is that Spark SQL (the underlying infrastructure for Structured Streaming) tries to guess the number of partitions to load a dataset (with records from Kafka per batch) and does this "poorly", i.e. many partitions have no data.

When you save a partition with no data you will get an empty file.

You can use repartition or coalesce operators to set the proper number of partitions and reduce (or even completely avoid) empty files. See Dataset API.

Why would you not do it? repartition and coalesce may incur performance degradation due to the extra step of shuffling the data between partitions (and possibly nodes in your Spark cluster). That can be expensive and not worth doing it (and hence I said that you would rather not do it).

You may then be asking yourself, how to know the right number of partitions? And that's a very good question in any Spark project. The answer is fairly simple (and obvious if you understand what and how Spark does the processing): "Know your data" so you can calculate how many is exactly right.

查看更多
Anthone
4楼-- · 2020-04-28 11:54

you can try with repartitionByRange(column)..

I used this while writing dataframe to HDFS .. It solved my empty file creation issue.

查看更多
家丑人穷心不美
5楼-- · 2020-04-28 12:03

I recommend using repartition(partitioningColumns) on the Dataframe resp. Dataset and after that partitionBy(partitioningColumns) on the writeStream operation to avoid writing empty files.

Reason: The bottleneck if you have a lot of data is often the read performance with Spark if you have a lot of small (or even empty) files and no partitioning. So you should definitely make use of the file/directory partitioning (which is not the same as RDD partitioning). This is especially a problem when using AWS S3. The partitionColumns should fit your common queries when reading the data like timestamp/day, message type/Kafka topic, ...

See also the partitionBy documentation on http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

year=2016/month=01/, year=2016/month=02/

Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.

This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0.

查看更多
登录 后发表回答