filtering with dask read_parquet method gives unwa

2020-07-18 04:20发布

I am trying to read parquet files using thedask read_parquet method and the filters kwarg. however it sometimes doesn't filter according to the given condition.

Example: creating and saving data frame with a dates column

import pandas as pd
import numpy as np
import dask.dataframe as dd

nums  = range(1,6)
dates = pd.date_range('2018-07-01', periods=5, freq='1d')
df = pd.DataFrame({'dates':dates, 'nums': nums})

ddf = dd.from_pandas(df, npartitions=3).to_parquet('test_par', engine = 'fastparquet')

when i read and filter on the dates column from the 'test_par' folder it doesn't seem to work

filters=[('dates', '>', np.datetime64('2018-07-04'))]
df  = dd.read_parquet('test_par', engine='fastparquet', filters=filters).compute()

as you can see in the output, 2018-07-03 and 2018-07-04 are present.

+-------+------------+------+
|       | dates      | nums |
+-------+------------+------+
| index |            |      |
+-------+------------+------+
| 2     | 2018-07-03 | 3    |
+-------+------------+------+
| 3     | 2018-07-04 | 4    |
+-------+------------+------+
| 4     | 2018-07-05 | 5    |
+-------+------------+------+

Am i doing something wrong ? or should i report this on github ?

1条回答
▲ chillily
2楼-- · 2020-07-18 04:50

The filters keyword is a row-group-wise action (row-group is the parquet term for a set of data rows, like partition for a data-frame). It does not do any filtering within partitions.

When you use filters, you will be excluding partitions in which, according to the max/min statistics in the file, there are no rows in a given partition which can match the given filter. For example, if you specify x>5, a partition that has min=2,max=4 will be excluded, but one with min=2,max=6 will not, even though the latter will contain only some rows that meet the filter.

To filter the data, you should still use usual syntax

df[df.dates > np.datetime64('2018-07-04')]

in addition to filter, and view the use of filters as an optional optimisation. Without it, Dask would have to read even partitions with no good data, and then apply the condition, resulting in no results for those partitions. Better not to load them, if possible.

查看更多
登录 后发表回答