What is the “right” way to close a Dask LocalClust

2020-03-12 05:15发布

问题:

I am trying to use dask-distributed on my laptop using a LocalCluster, but I have still not found a way to let my application close without raising some warnings or triggering some strange iterations with matplotlib (I am using the tkAgg backend).

For example, if I close both the client and the cluster in this order then tk can not remove in an appropriate way the image from the memory and I get the following error:

Traceback (most recent call last):
  File "/opt/Python-3.6.0/lib/python3.6/tkinter/__init__.py", line 3501, in __del__
    self.tk.call('image', 'delete', self.name)
RuntimeError: main thread is not in main loop

For example, the following code generates this error:

from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster

if __name__ == '__main__':
    cluster = LocalCluster(
        n_workers=2,
        processes=True,
        threads_per_worker=1
    )
    client = Client(cluster)

    x = np.linspace(0, 1, 100)
    y = x * x
    plt.plot(x, y)

    print('Computation complete! Stopping workers...')
    client.close()
    sleep(1)
    cluster.close()

    print('Execution complete!')

The sleep(1) line makes the problem more likely to appear, as it does not occur at every execution.

Any other combination that I tried to stop the execution (avoid to close the client, avoid to close the cluster, avoid to close both) generates problems with tornado, instead. Usually the following

tornado.application - ERROR - Exception in Future <Future cancelled> after timeout

What is the right combination to stop the local cluster and the client? Am I missing something?

These are the libraries that I am using:

  • python 3.[6,7].0
  • tornado 5.1.1
  • dask 0.20.0
  • distributed 1.24.0
  • matplotlib 3.0.1

Thank you for your help!

回答1:

From our experience - the best way is to use a context manager, for example:

import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster 

if __name__ == '__main__':
    cluster = LocalCluster(
    n_workers=2,
    processes=True,
    threads_per_worker=1
    )
    with Client(cluster) as client:
        x = np.linspace(0, 1, 100)
        y = x * x
        plt.plot(x, y)
        print('Computation complete! Stopping workers...')

    print('Execution complete!')


回答2:

Expanding on skibee's answer, here is a pattern I use. It sets up a temporary LocalCluster and then shuts it down. Very useful when different parts of your code must be parallelized in different ways (e.g. one needs threads and the other needs processes).

from dask.distributed import Client, LocalCluster
import multiprocessing as mp

with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
    ip='tcp://localhost:9895',
) as cluster, Client(cluster) as client:
    # Do something using 'client'

What's happening above:

  • A cluster is being spun up on your local machine (i.e. the one running the Python interpreter). The scheduler of this cluster is listening on port 9895.

  • The cluster is created and a number of workers are spun up. Each worker is a process, since I specified processes=True.

  • The number of workers spun up is 90% of the number of CPU cores, rounded down. So an 8-core machine will spawn 7 worker processes. This leaves at least one core free for SSH / Notebook server / other applications.

  • Each worker is initialized with 2GB of RAM. Having a temporary cluster allows you to spin up workers with different amount of RAM for different workloads.

  • Once the with block exits, both cluster.close() and client.close() are called. The first one closes the cluster, scehduler, nanny and all workers, and the second disconnects the client (created on your python interpreter) from the cluster.

While the workets are processing, you can check if the cluster is active by checking lsof -i :9895. If there is no output, the cluster has closed.


Sample use-case: suppose you want to use a pre-trained ML model to predict on 1,000,000 examples.

The model is optimized/vectorized such that it can predict on 10K examples pretty fast, but 1M is slow. In such a case, a setup which works is to load the multiple copies of the model from disk, and then use them to predict on chunks of the 1M examples.

Dask allows you to do this pretty easily and achieve a good speedup:

def load_and_predict(input_data_chunk):
    model_path = '...' # On your disk, so accessible by all processes.
    model = some_library.load_model(model_path)
    labels, scores = model.predict(input_data_chunk, ...)
    return np.array([labels, scores])

# (not shown) Load `input_data`, a list of your 1M examples.

import dask.array as DaskArray

da_input_data = DaskArray.from_array(input_data, chunks=(10_000,))

prediction_results = None
with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
    ip='tcp://localhost:9895',
) as cluster, Client(cluster) as client:
    prediction_results = da_input_data.map_blocks(load_and_predict).compute()

# Combine prediction_results, which will be a list of Numpy arrays, 
# each with labels, scores for 10,000 examples.

References:

  • Setting up a local cluster: https://distributed.dask.org/en/latest/local-cluster.html
  • Client close method: https://distributed.dask.org/en/latest/api.html#distributed.Client.close
  • Scheduler close method, which from my understanding is what is invoked by cluster.close(): https://distributed.dask.org/en/latest/scheduling-state.html#distributed.scheduler.Scheduler.close

  • with statement having multiple variables: https://stackoverflow.com/a/1073814/4900327