Using predicates to filter rows from pyarrow.parqu

2020-06-24 05:44发布

问题:

I have a parquet dataset stored on s3, and I would like to query specific rows from the dataset. I was able to do that using petastorm but now I want to do that using only pyarrow.

Here's my attempt:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    'analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)

df = dataset.read_pandas().to_pandas()

But that returns a pandas DataFrame as if the filter didn't work, i.e I have rows with various values of event_name. Is there something I am missing or something I misunderstood? I could filter after getting the pandas DataFrame but I would use much more memory space than needed.

回答1:

Currently, the filters functionality is only implemented at the file level, not yet at the row level.

So if you have a dataset as a collection of multiple, partitioned parquet files in a nested hierarchy (the type of partitioned datasets described here: https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files), you can use the filters argument to only read a subset of the files.
But, you can't yet use it for reading only a subset of the row groups of a single file (see https://issues.apache.org/jira/browse/ARROW-1796).

But, it would be nice that you get an error message of specifying such an invalid filter. I opened an issue for that: https://issues.apache.org/jira/browse/ARROW-5572



回答2:

For python 3.6+ AWS has a library called aws-data-wrangler that helps with the integration between Pandas/S3/Parquet and it allows you to filter on partitioned S3 keys.

to install do;

pip install awswrangler

To reduce the data you read, you can filter rows based on the partitioned columns from your parquet file stored on s3. To filter the rows from the partitioned column event_name with the value "SomeEvent" do;

for awswrangler < 1.0.0

import awswrangler as wr

df = wr.pandas.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)

for awswrangler > 1.0.0 do;

import awswrangler as wr

df = wr.s3.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)


回答3:

For anyone getting here from Google, you can now filter on rows in PyArrow when reading a Parquet file. Regardless if you read it via pandas or pyarrow.parquet.

From the documentation:

filters (List[Tuple] or List[List[Tuple]] or None (default)) – Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported.

Predicates are expressed in disjunctive normal form (DNF), like [[('x', '=', 0), ...], ...]. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR).

Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation.