is Parquet predicate pushdown works on S3 using Sp

2020-02-05 05:26发布

Just wondering if Parquet predicate pushdown also works on S3, not only HDFS. Specifically if we use Spark (non EMR).

Further explanation might be helpful since it might involve understanding on distributed file system.

5条回答
小情绪 Triste *
2楼-- · 2020-02-05 05:53

Here's the keys I'd recommend for s3a work

spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.hadoop.parquet.enable.summary-metadata false

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000

spark.sql.hive.metastorePartitionPruning true
spark.speculation false
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped true
查看更多
Bombasti
3楼-- · 2020-02-05 05:58

Yes. Filter pushdown does not depend on the underlying file system. It only depends on the spark.sql.parquet.filterPushdown and the type of filter (not all filters can be pushed down).

See https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L313 for the pushdown logic.

查看更多
倾城 Initia
4楼-- · 2020-02-05 06:08

I was wondering this myself so I just tested it out. We use EMR clusters and Spark 1.6.1 .

  • I generated some dummy data in Spark and saved it as a parquet file locally as well as on S3.
  • I created multiple Spark jobs with different kind of filters and column selections. I ran these tests once for the local file and once for the S3 file.
  • I then used the Spark History Server to see how much data each job had as input.

Results:

  • For the local parquet file: The results showed that the column selection and filters were pushed down to the read as the input size was reduced when the job contained filters or column selection.
  • For the S3 parquet file: The input size was always the same as the Spark job that processed all of the data. None of the filters or column selections were pushed down to the read. The parquet file was always completely loaded from S3. Even though the query plan (.queryExecution.executedPlan) showed that the filters were pushed down.

I will add more details about the tests and results when I have time.

查看更多
姐就是有狂的资本
5楼-- · 2020-02-05 06:08

Recently I tried this with Spark 2.4 and seems like Pushdown predicate works with s3.

This is the spark sql query:

explain select * from default.my_table where month = '2009-04' and site = 'http://jdnews.com/sports/game_1997_jdnsports__article.html/play_rain.html' limit 100;

And here is the part of output:

PartitionFilters: [isnotnull(month#6), (month#6 = 2009-04)], PushedFilters: [IsNotNull(site), EqualTo(site,http://jdnews.com/sports/game_1997_jdnsports__article.html/play_ra...

Which clearly stats that PushedFilters is not empty.

Note: The used table was created on top of AWS S3

查看更多
成全新的幸福
6楼-- · 2020-02-05 06:12

Spark uses the HDFS parquet & s3 libraries so the same logic works. (and in spark 1.6 they've added even a faster shortcut for flat schema parquet files)

查看更多
登录 后发表回答