I am working to try to convert a program to be parallelizable/multithreaded with the excellent dask library. Here is the program I am working on converting:
Python PANDAS: Stack by Enumerated Date to Create Records Vectorized
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
from io import StringIO
test_data = '''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8'''
df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])
df_test = df_test.loc[np.repeat(df_test.index, df_test['units'])]
df_test['transaction_dt'] += pd.to_timedelta(df_test.groupby(level=0).cumcount(), unit='d')
df_test = df_test.reset_index(drop=True)
expected results:
id,transaction_dt,measures
1,2018-01-01,30.5
1,2018-01-02,30.5
1,2018-01-03,30.5
1,2018-01-04,30.5
1,2018-01-03,26.3
1,2018-01-04,26.3
1,2018-01-05,26.3
1,2018-01-06,26.3
2,2018-01-01,12.7
2,2018-01-02,12.7
2,2018-01-03,12.7
2,2018-01-03,8.8
2,2018-01-04,8.8
2,2018-01-05,8.8
It occurred to me that this might be a good candidate to try to parallelize because the separate dask partitions should not need to know anything about each other to accomplish the required operations. Here is a naive representation of how I thought it might work:
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] += dd_test.to_timedelta(dd.groupby(level=0).cumcount(), unit='d')
dd_test = dd_test.reset_index(drop=True)
So far I have been trying to work through the following errors or idiomatic differences:
- "NotImplementedError: Only integer valued repeats supported." I have tried to convert the index into a int column/array to try as well but still run into the issue.
2. dask does not support the mutating operator: "+="
3. No dask .to_timedelta() argument
4. No dask .cumcount() (but I think .cumsum() is interchangable?!)
If there are any dask experts out there who might be able let me know if there are fundamental impediments to preclude me from trying this or any tips on implementation, that would be a great help!
Edit:
I think I have made a bit of progress on this since posting the question:
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1
dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] = dd_test['transaction_dt'] + (dd.test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)
However, I am still stuck on the dask array repeats error. Any tips still welcome.