Suppose I have the following code, to generate a dummy dask dataframe:
import pandas as pd
import dask.dataframe as dd
pandas_dataframe = pd.DataFrame({'A' : [0,500,1000], 'B': [-100, 200, 300] , 'C' : [0,0,1.0] } )
test_data_frame = dd.from_pandas( pandas_dataframe, npartitions= 1 )
Ideally I would like to know what is the recommended way to add another column to the data frame, computing the column content through a rolling window, in a lazy fashion.
I came up with the following approach:
import numpy as np
import dask.delayed as delay
@delay
def coupled_operation_example(dask_dataframe,
list_of_input_lbls,
fcn,
window_size,
init_value,
output_lbl):
def preallocate_channel_data(vector_length, first_components):
vector_out = np.zeros(len(dask_dataframe))
vector_out[0:len(first_components)] = first_components
return vector_out
def create_output_signal(relevant_data, fcn, window_size , initiated_vec):
## to be written; fcn would be a fcn accepting the sliding window
initiatied_vec = preallocate_channel_data(len(dask_dataframe, init_value))
relevant_data = dask_dataframe[list_of_input_lbls]
my_output_signal = create_output_signal(relevant_data, fcn, window_size, initiated_vec)
I was writing this, convinced that dask dataframe would allow me some slicing: they do not. So, my first option would be to extract the columns involved in the computations as numpy arrays, but so they would be eagerly evaluated. I think the penalty in performance would be significant. At the moment I create dask dataframes from h5 data, using h5py: so everything is lazy, until I write output files.
Up to now I was processing only data on a certain row; so I had been using:
test_data_frame .apply(fcn, axis =1, meta = float)
I do not think there is an equivalent functional approach for rolling windows; am I right? I would like something like Seq.windowed in F# or Haskell. Any suggestion highly appreciated.
I have tried to solve it through a closure. I will post benchmarks on some data, as soon as I have finalized the code. For now I have the following toy example, which seems to work: since dask dataframe's apply methods seems to be preserving the row order.
Now the bad news, I have tried to abstract it out, passing the state around and using a rolling window of any width, through this new function:
Suppose we initialize the function with:
Try to run it and be ready for surprises: the first element is wrong, possibly some pointer's problems, given the odd result. Any insight?
Anyhow, if one passes primitive types, it seems to function.
Update:
the solution is in using copy:
Now the functions behaves as expected; of course it is necessary to adapt the function updates and the coupled computation function if one needs a more coupled logic