is Parquet predicate pushdown works on S3 using Sp

2020-02-05 05:57发布

问题:

Just wondering if Parquet predicate pushdown also works on S3, not only HDFS. Specifically if we use Spark (non EMR).

Further explanation might be helpful since it might involve understanding on distributed file system.

回答1:

Yes. Filter pushdown does not depend on the underlying file system. It only depends on the spark.sql.parquet.filterPushdown and the type of filter (not all filters can be pushed down).

See https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L313 for the pushdown logic.



回答2:

I was wondering this myself so I just tested it out. We use EMR clusters and Spark 1.6.1 .

  • I generated some dummy data in Spark and saved it as a parquet file locally as well as on S3.
  • I created multiple Spark jobs with different kind of filters and column selections. I ran these tests once for the local file and once for the S3 file.
  • I then used the Spark History Server to see how much data each job had as input.

Results:

  • For the local parquet file: The results showed that the column selection and filters were pushed down to the read as the input size was reduced when the job contained filters or column selection.
  • For the S3 parquet file: The input size was always the same as the Spark job that processed all of the data. None of the filters or column selections were pushed down to the read. The parquet file was always completely loaded from S3. Even though the query plan (.queryExecution.executedPlan) showed that the filters were pushed down.

I will add more details about the tests and results when I have time.



回答3:

Here's the keys I'd recommend for s3a work

spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.hadoop.parquet.enable.summary-metadata false

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000

spark.sql.hive.metastorePartitionPruning true
spark.speculation false
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped true


回答4:

Spark uses the HDFS parquet & s3 libraries so the same logic works. (and in spark 1.6 they've added even a faster shortcut for flat schema parquet files)



回答5:

Recently I tried this with Spark 2.4 and seems like Pushdown predicate works with s3.

This is the spark sql query:

explain select * from default.my_table where month = '2009-04' and site = 'http://jdnews.com/sports/game_1997_jdnsports__article.html/play_rain.html' limit 100;

And here is the part of output:

PartitionFilters: [isnotnull(month#6), (month#6 = 2009-04)], PushedFilters: [IsNotNull(site), EqualTo(site,http://jdnews.com/sports/game_1997_jdnsports__article.html/play_ra...

Which clearly stats that PushedFilters is not empty.

Note: The used table was created on top of AWS S3