Dask broadcast not available during compute graph

2019-07-22 20:21发布

I am experimenting with Dask and want to ship a lookup pandas.DataFrame to all worker nodes. Unfortunately, it fails with:

TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')

When instead of lookup['baz'].iloc[2] using lookup.result()['foo'].iloc[2], it works fine but: for larger instances of the input dataframe, it seems to be stuck at from_pandas again and again. Also, it seems strange that the future needs to be blocked manually (over and over again for each row in the apply operation. Is there a way to block for the future only once per worker node? A naive improvement could be to use map_partitions, but this would only be feasible if the number of partitions is fairly small.

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)


def foo(row, lookup):
    # TODO some computation which relies on the lookup
    return lookup['foo'].iloc[2]

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()

In fact, this naive dask implementation seems to be slower than plain pandas for larger problem instances. I suspect the slow execution performance is related to the issue raised above.

1条回答
Anthone
2楼-- · 2019-07-22 20:52

Instead of this:

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))

Try this instead:

df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))

Previously you were hiding the future inside of a lambda function. Dask wasn't able to find it in order to turn it into the proper value. Instead, when we pass the future as a proper argument Dask is able to identify it for what it is and give you the value properly.

查看更多
登录 后发表回答