My use case is following:
Wrtiting RDD
to file by saveAsTable
(so to ORC files). Each saving creates new file (so 1000 000
writings give me 1000 000
ORC files). I know that it is natural that for each RDD there are created new ORC files(s). However, I don't knwo why it is so slow when it comes to query them from ThriftServer.
My question is: How to understand such strange behaviour ?
For example, SELECT COUNT(*)
on 1000 000 rows (so the same files) takes about 1 minute
(!).
However, when I save 1000 000
rows to one file, the same query works in 50ms
.
I would like to understand this difference. After all, 1000 000
files it is small number.
High level execution plan of your count action will be like this (assuming you have your files in distributed file system, as example I'll use HDFS):
Request files from HDFS NameNode
Load HDFS blocks into executors
- Do count on each partition (using ORC metadata or directly - depends on implementation) and sum all together
Some estimations: 1000 000 files require the same number of requests to NameNode to resolve physical location of data blocks. It's done in < 60s (< 0.06ms per request) - pretty good job done by NameNode. Rest of the time Spark loads data into memory (if needed) or/and gets statistics from ORC metadata. So I would profile NameNode (or similar service if you use S3 or other) - it's first candidate to be a bottleneck. From ORC documentation:
Compared with RCFile format, for example, ORC file format has many
advantages such as:
a single file as the output of each task, which reduces the NameNode's load
While ORC trying to reduce number of files, your code does opposite. And
The default stripe size is 250 MB. Large stripe sizes enable large,
efficient reads from HDFS.
The file footer contains a list of stripes in the file, the number of
rows per stripe, and each column's data type. It also contains
column-level aggregates count, min, max, and sum.
So simple statistics like count are pre-calculated and should not be an performance issue. You can try to solve an issue by brute force simple adding memory and CPU power to HDFS NameNode, but I guess it's reasonable to keep modest number of files. If your data came from some stream source, you can create some kind of compaction job that merge small files into large one and run it periodically. Or, as alternative, you can read from the source once per 2-5 minutes, if such delay is OK for your use case.