Override dask scheduler to concurrently load data

2019-07-13 03:28发布

问题:

I want to run graphs/futures on my distributed cluster which all have a 'load data' root task and then a bunch of training tasks that run on that data. A simplified version would look like this:

from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

Running this as above the scheduler gets one worker to read the file, then it spills that data to disk to share it with the other workers. However, loading the data is usually reading from a large HDF5 file, which can be done concurrently, so I was wondering if there was a way to force all workers to read this file concurrently (they all compute the root task) instead of having them wait for one worker to finish then slowly transferring the data from that worker.

I know there is the client.run() method which I can use to get all workers to read the file concurrently, but how would you then get the data you've read to feed into the downstream tasks?

I cannot use the dask data primitives to concurrently read HDF5 files because I need things like multi-indexes and grouping on multiple columns.

回答1:

Revisited this question and found a relatively simple solution, though it uses internal API methods and involves a blocking call to client.run(). Using the same variables as in the question:

from distributed import get_worker
client_id = client.id
def load_dataset():
    worker = get_worker()
    data = {'load_dataset-0': load_data_func('path/to/data')}
    info = worker.update_data(data=data, report=False)
    worker.scheduler.update_data(who_has={key: [worker.address] for key in data}, 
                                 nbytes=info['nbytes'], client=client_id)
client.run(load_dataset)

Now if you run client.has_what() you should see that each worker holds the key load_dataset-0. To use this in downstream computations you can simply create a future for the key:

from distributed import Future
load_data_future = Future('load_dataset-0', client=client)

and this can be used with client.compute() or dask.delayed as usual. Indeed the final line from the example in the question would work fine:

train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

Bear in mind that it uses internal API methods Worker.update_data and Scheduler.update_data and works fine as of distributed.__version__ == 1.21.6 but could be subject to change in future releases.



回答2:

As of today (distributed.__version__ == 1.20.2) what you ask for is not possible. The closest thing would be to compute once and then replicate the data explicitly

future = client.submit(load, path)
wait(future)
client.replicate(future)

You may want to raise this as a feature request at https://github.com/dask/distributed/issues/new