I have parquet data partitioned by date
& hour
, folder structure:
events_v3
-- event_date=2015-01-01
-- event_hour=2015-01-1
-- part10000.parquet.gz
-- event_date=2015-01-02
-- event_hour=5
-- part10000.parquet.gz
I have created a table raw_events
via spark but when I try to query, it scans all the directories for footer and that slows down the initial query, even if I am querying only one day worth of data.
query:
select * from raw_events where event_date='2016-01-01'
similar problem : http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCAAswR-7Qbd2tdLSsO76zyw9tvs-Njw2YVd36bRfCG3DKZrH0tw@mail.gmail.com%3E ( but its old)
Log:
App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/
and then it spawns 350 tasks since there are 350 days worth of data.
I have disabled schemaMerge
, and have also specified the schema to read as, so it can just go to the partition that I am looking at, why should it print all the leaf files ?
Listing leaf files with 2 executors take 10 minutes, and the query actual execution takes on 20 seconds
code sample:
val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()
val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")
df.createOrReplaceTempView("temp_events")
sparkSession.sql(
"""
|select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb
""".stripMargin).show()
To clarify Gaurav's answer, that code snipped is from Hadoop branch-2, Probably not going to surface until Hadoop 2.9 (see HADOOP-13208); and someone needs to update Spark to use that feature (which won't harm code using HDFS, just won't show any speedup there).
One thing to consider is: what makes a good file layout for Object Stores.
Finally, Apache Hadoop, Apache Spark and related projects are all open source. Contributions are welcome. That's not just the code, it's documentation, testing, and, for this performance stuff, testing against your actual datasets. Even giving us details about what causes problems (and your dataset layouts) is interesting.
As soon as spark is given a directory to read from it issues call to
listLeafFiles
(org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala). This in turn callsfs.listStatus
which makes an api call to get list of files and directories. Now for each directory this method is called again. This hapens recursively until no directories are left. This by design works good in a HDFS system. But works bad in s3 since list file is an RPC call. S3 on other had supports get all files by prefix, which is exactly what we need.So for example if we had above directory structure with 1 year worth of data with each directory for hour and 10 sub directory we would have , 365 * 24 * 10 = 87k api calls, this can be reduced to 138 api calls given that there are only 137000 files. Each s3 api calls return 1000 files.
Code:
org/apache/hadoop/fs/s3a/S3AFileSystem.java
/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala