I want to run graphs/futures on my distributed cluster which all have a 'load data' root task and then a bunch of training tasks that run on that data. A simplified version would look like this:
from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params)
for params in train_param_set]
Running this as above the scheduler gets one worker to read the file, then it spills that data to disk to share it with the other workers. However, loading the data is usually reading from a large HDF5 file, which can be done concurrently, so I was wondering if there was a way to force all workers to read this file concurrently (they all compute the root task) instead of having them wait for one worker to finish then slowly transferring the data from that worker.
I know there is the client.run()
method which I can use to get all workers to read the file concurrently, but how would you then get the data you've read to feed into the downstream tasks?
I cannot use the dask data primitives to concurrently read HDF5 files because I need things like multi-indexes and grouping on multiple columns.
Revisited this question and found a relatively simple solution, though it uses internal API methods and involves a blocking call to
client.run()
. Using the same variables as in the question:Now if you run
client.has_what()
you should see that each worker holds the keyload_dataset-0
. To use this in downstream computations you can simply create a future for the key:and this can be used with
client.compute()
ordask.delayed
as usual. Indeed the final line from the example in the question would work fine:Bear in mind that it uses internal API methods
Worker.update_data
andScheduler.update_data
and works fine as ofdistributed.__version__ == 1.21.6
but could be subject to change in future releases.As of today (
distributed.__version__ == 1.20.2
) what you ask for is not possible. The closest thing would be to compute once and then replicate the data explicitlyYou may want to raise this as a feature request at https://github.com/dask/distributed/issues/new