I am experimenting with Dask and want to ship a lookup pandas.DataFrame
to all worker nodes. Unfortunately, it fails with:
TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')
When instead of lookup['baz'].iloc[2]
using lookup.result()['foo'].iloc[2]
, it works fine but: for larger instances of the input dataframe, it seems to be stuck at from_pandas
again and again. Also, it seems strange that the future needs to be blocked manually (over and over again for each row in the apply operation. Is there a way to block for the future only once per worker node? A naive improvement could be to use map_partitions
, but this would only be feasible if the number of partitions is fairly small.
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})
df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)
def foo(row, lookup):
# TODO some computation which relies on the lookup
return lookup['foo'].iloc[2]
df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()
In fact, this naive dask implementation seems to be slower than plain pandas for larger problem instances. I suspect the slow execution performance is related to the issue raised above.