Python PANDAS: Converting from pandas/numpy to das

2020-07-09 10:02发布

问题:

I am working to try to convert a program to be parallelizable/multithreaded with the excellent dask library. Here is the program I am working on converting:

Python PANDAS: Stack by Enumerated Date to Create Records Vectorized

import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

df_test = df_test.loc[np.repeat(df_test.index, df_test['units'])]
df_test['transaction_dt'] += pd.to_timedelta(df_test.groupby(level=0).cumcount(), unit='d')
df_test = df_test.reset_index(drop=True)

expected results:

id,transaction_dt,measures
1,2018-01-01,30.5
1,2018-01-02,30.5
1,2018-01-03,30.5
1,2018-01-04,30.5
1,2018-01-03,26.3
1,2018-01-04,26.3
1,2018-01-05,26.3
1,2018-01-06,26.3
2,2018-01-01,12.7
2,2018-01-02,12.7
2,2018-01-03,12.7
2,2018-01-03,8.8
2,2018-01-04,8.8
2,2018-01-05,8.8 

It occurred to me that this might be a good candidate to try to parallelize because the separate dask partitions should not need to know anything about each other to accomplish the required operations. Here is a naive representation of how I thought it might work:

dd_test = dd.from_pandas(df_test, npartitions=3)

dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] += dd_test.to_timedelta(dd.groupby(level=0).cumcount(), unit='d')
dd_test = dd_test.reset_index(drop=True)

So far I have been trying to work through the following errors or idiomatic differences:

  1. "NotImplementedError: Only integer valued repeats supported." I have tried to convert the index into a int column/array to try as well but still run into the issue.

2. dask does not support the mutating operator: "+="

3. No dask .to_timedelta() argument

4. No dask .cumcount() (but I think .cumsum() is interchangable?!)

If there are any dask experts out there who might be able let me know if there are fundamental impediments to preclude me from trying this or any tips on implementation, that would be a great help!

Edit:

I think I have made a bit of progress on this since posting the question:

dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1

dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] = dd_test['transaction_dt'] + (dd.test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]') 
dd_test = dd_test.reset_index(drop=True)

However, I am still stuck on the dask array repeats error. Any tips still welcome.

回答1:

Not sure if this is exactly what you are looking for, but I replaced the da.repeat with using np.repeat, along with explicity casting dd_test.index and dd_test['units'] to numpy arrays, and finally adding dd_test['transaction_dt'].astype('M8[us]') to your timedelta calculation.

df_test = pd.read_csv(StringIO(test_data), sep=',')

dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1

dd_test = dd_test.loc[np.repeat(np.array(dd_test.index), 
np.array(dd_test['units']))]
dd_test['transaction_dt'] = dd_test['transaction_dt'].astype('M8[us]') + (dd_test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)

df_expected = dd_test.compute()