One of the great benefits of the Parquet data storage format is that it's columnar. If I've got a 'wide' dataset with hundreds of columns, but my query only touches a few of those, then it's possible read only the data that stores those few columns, and skip the rest.
Presumably this feature works by reading a bit of metadata at the head of a parquet file that indicates the locations on the filesystem for each column. The reader can then seek on disk to read in only the necessary columns.
Does anyone know whether spark's default parquet reader correctly implements this kind of selective seeking on S3? I think it's supported by S3, but there's a big difference between theoretical support and an implementation that properly exploits that support.
This needs to be broken down
FileSystem
seek()
+read()
orreadFully(position, buffer, length)
calls? Yesspark.hadoop.fs.s3a.experimental.fadvise=random
to trigger random access.Hadoop 2.7 and earlier handle the aggressive seek() round the file badly, because they always initiate a GET offset-end-of-file, get surprised by the next seek, have to abort that connection, reopen a new TCP/HTTPS 1.1 connection (slow, CPU heavy), do it again, repeatedly. The random IO operation hurts on bulk loading of things like .csv.gz, but is critical to getting ORC/Parquet perf.
You don't get the speedup on Hadoop 2.7's hadoop-aws JAR. If you need it you need to update hadoop*.jar and dependencies, or build Spark up from scratch against Hadoop 2.8
Note that Hadoop 2.8+ also has a nice little feature: if you call
toString()
on an S3A filesystem client in a log statement, it prints out all the filesystem IO stats, including how much data was discarded in seeks, aborted TCP connections &c. Helps you work out what's going on.2018-04-13 warning:: Do not try to drop the Hadoop 2.8+
hadoop-aws
JAR on the classpath along with the rest of the hadoop-2.7 JAR set and expect to see any speedup. All you will see are stack traces. You need to update all the hadoop JARs and their transitive dependencies.No, predicate pushdown is not fully supported. This, of course, depends on:
In order to check your specific use case, you can enable DEBUG log level in Spark, and run your query. Then, you can see whether there are "seeks" during S3 (HTTP) requests as well as how many requests to were actually sent. Something like this:
17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[\r][\n]"
Here's example of an issue report that was opened recently due to inability of Spark 2.1 to calculate
COUNT(*)
of all the rows in a dataset based on metadata stored in Parquet file: https://issues.apache.org/jira/browse/SPARK-21074parquet reader of spark is just like any other InputFormat,
None of the inputFormat have any thing special for S3. The input formats can read from LocalFileSystem , Hdfs and S3 no special optimization done for that.
Parquet InpuTFormat depending on the columns you ask will selectively read the columns for you .
If you want to be dead sure (although push down predicates works in latest spark version) manually select the columns and write the transformation and actions , instead of depending on SQL
DISCLAIMER: I don't have a definitive answer and don't want to act as an authoritative source either, but have spent some time on parquet support in Spark 2.2+ and am hoping that my answer can help us all to get closer to the right answer.
I use Spark 2.3.0-SNAPSHOT that I built today right from the master.
parquet
data source format is handled by ParquetFileFormat which is a FileFormat.If I'm correct, the reading part is handled by buildReaderWithPartitionValues method (that overrides the
FileFormat
's).buildReaderWithPartitionValues
is used exclusively whenFileSourceScanExec
physical operator is requested for so-called input RDDs that are actually a single RDD to generate internal rows whenWholeStageCodegenExec
is executed.With that said, I think that reviewing what
buildReaderWithPartitionValues
does may get us closer to the final answer.When you look at the line you can get assured that we're on the right track.
That code path depends on
spark.sql.parquet.filterPushdown
Spark property that is turned on by default.That leads us to parquet-hadoop's ParquetInputFormat.setFilterPredicate iff the filters are defined.
The code gets more interesting a bit later when the filters are used when the code falls back to parquet-mr (rather than using the so-called vectorized parquet decoding reader). That's the part I don't really understand (except what I can see in the code).
Please note that the vectorized parquet decoding reader is controlled by
spark.sql.parquet.enableVectorizedReader
Spark property that is turned on by default.TIP: To know what part of the
if
expression is used, enableDEBUG
logging level fororg.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
logger.In order to see all the pushed-down filters you could turn
INFO
logging level oforg.apache.spark.sql.execution.FileSourceScanExec
logger on. You should see the following in the logs:I do hope that if it's not close to be a definitive answer it has helped a little and someone picks it up where I left off to make it one soon. Hope dies last :)