Distributing graphs to across cluster nodes

2019-07-22 14:41发布

问题:

I'm making good progress with Dask.delayed. As a group, we've decided to put more time in working with graphs using Dask.

I have a question about distribution. I'm seeing the following behaviour on our cluster. I start up e.g. 8 workers on each of 8 nodes each with 4 threads, say/ I then client.compute 8 graphs to create the simulated data for subsequent processing. I want to have the 8 data sets generated one per node. However, what seems to happen is, not unreasonably, the eight functions are run on the first two nodes. Subsequent computations are run on the first and second nodes. Hence I see lack of scaling. Over time, the other nodes disappear from the diagnostics workers page. Is this expected?

So I want to distribute the data creation functions by node first. So when I want to compute the graphs, now I do:

if nodes is not None:
    print("Computing graph_list on the following nodes: %s" % nodes)
    return client.compute(graph_list, sync=True, workers=nodes, **kwargs)
else:
    return client.compute(graph_list, sync=True, **kwargs)

This seems to set up correctly: the diagnostics progress bar shows that my data creation functions are in memory but they do not start. If the nodes are omitted then the computation proceeds as expected. This behaviour occurs both on the cluster and on my desktop.

More info: looking at the scheduler log, I do see communication failures.

more dask-ssh_2017-09-04_09\:52\:09/dask_scheduler_sand-6-70\:8786.log
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:    tcp://10.143.6.70:8786
distributed.scheduler - INFO -       bokeh at:              0.0.0.0:8787
distributed.scheduler - INFO -        http at:              0.0.0.0:9786
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-ny4ev7qh
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Register tcp://10.143.6.73:36810
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.73:36810
distributed.scheduler - INFO - Register tcp://10.143.6.71:46656
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.71:46656
distributed.scheduler - INFO - Register tcp://10.143.7.66:42162
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.66:42162
distributed.scheduler - INFO - Register tcp://10.143.7.65:35114
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.65:35114
distributed.scheduler - INFO - Register tcp://10.143.6.70:43208
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.70:43208
distributed.scheduler - INFO - Register tcp://10.143.7.67:45228
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.67:45228
distributed.scheduler - INFO - Register tcp://10.143.6.72:36100
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.6.72:36100
distributed.scheduler - INFO - Register tcp://10.143.7.68:41915
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.143.7.68:41915
distributed.scheduler - INFO - Receive client connection: 5d1dab2a-914e-11e7-8bd1-180373ff6d8b
distributed.scheduler - INFO - Worker 'tcp://10.143.6.71:46656' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.6.71:46656
distributed.scheduler - INFO - Removed worker tcp://10.143.6.71:46656
distributed.scheduler - INFO - Worker 'tcp://10.143.6.73:36810' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.6.73:36810
distributed.scheduler - INFO - Removed worker tcp://10.143.6.73:36810
distributed.scheduler - INFO - Worker 'tcp://10.143.6.72:36100' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.6.72:36100
distributed.scheduler - INFO - Removed worker tcp://10.143.6.72:36100
distributed.scheduler - INFO - Worker 'tcp://10.143.7.67:45228' failed from closed comm: Stream is clos
ed
distributed.scheduler - INFO - Remove worker tcp://10.143.7.67:45228
distributed.scheduler - INFO - Removed worker tcp://10.143.7.67:45228
(arlenv) [hpccorn1@login-sand8 performance]$

Does this raise any possible causes?

Thanks, Tim

回答1:

How Dask chooses to allocate tasks to workers is complex, and takes into account many issues like load balancing, data transfer, resource constraints, etc.. It can be hard to reason about where things will end up without a concrete and simple example.

One thing you can try is to submit all of your computations at once, this lets the scheduler make slightly more intelligent decisions rather than seeing things one at a time.

So, you might try replacing code like this:

futures = [client.compute(delayed_value) for delayed_value in L]
wait(futures)

with code like this

futures = client.compute(L)
wait(futures)

But honestly I only give this a 30% chance of solving your problem. It's hard to know what is going on without diving much more deeply into your problem. If you can provide a very simple reproducible code example then that would be best.