I am trying to use dask-distributed on my laptop using a LocalCluster, but I have still not found a way to let my application close without raising some warnings or triggering some strange iterations with matplotlib (I am using the tkAgg backend).
For example, if I close both the client and the cluster in this order then tk can not remove in an appropriate way the image from the memory and I get the following error:
Traceback (most recent call last):
File "/opt/Python-3.6.0/lib/python3.6/tkinter/__init__.py", line 3501, in __del__
self.tk.call('image', 'delete', self.name)
RuntimeError: main thread is not in main loop
For example, the following code generates this error:
from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster
if __name__ == '__main__':
cluster = LocalCluster(
n_workers=2,
processes=True,
threads_per_worker=1
)
client = Client(cluster)
x = np.linspace(0, 1, 100)
y = x * x
plt.plot(x, y)
print('Computation complete! Stopping workers...')
client.close()
sleep(1)
cluster.close()
print('Execution complete!')
The sleep(1)
line makes the problem more likely to appear, as it does not occur at every execution.
Any other combination that I tried to stop the execution (avoid to close the client, avoid to close the cluster, avoid to close both) generates problems with tornado, instead. Usually the following
tornado.application - ERROR - Exception in Future <Future cancelled> after timeout
What is the right combination to stop the local cluster and the client? Am I missing something?
These are the libraries that I am using:
- python 3.[6,7].0
- tornado 5.1.1
- dask 0.20.0
- distributed 1.24.0
- matplotlib 3.0.1
Thank you for your help!
From our experience - the best way is to use a context manager, for example:
Expanding on skibee's answer, here is a pattern I use. It sets up a temporary LocalCluster and then shuts it down. Very useful when different parts of your code must be parallelized in different ways (e.g. one needs threads and the other needs processes).
What's happening above:
A cluster is being spun up on your local machine (i.e. the one running the Python interpreter). The scheduler of this cluster is listening on port 9895.
The cluster is created and a number of workers are spun up. Each worker is a process, since I specified
processes=True
.The number of workers spun up is 90% of the number of CPU cores, rounded down. So an 8-core machine will spawn 7 worker processes. This leaves at least one core free for SSH / Notebook server / other applications.
Each worker is initialized with 2GB of RAM. Having a temporary cluster allows you to spin up workers with different amount of RAM for different workloads.
Once the
with
block exits, bothcluster.close()
andclient.close()
are called. The first one closes the cluster, scehduler, nanny and all workers, and the second disconnects the client (created on your python interpreter) from the cluster.While the workets are processing, you can check if the cluster is active by checking
lsof -i :9895
. If there is no output, the cluster has closed.Sample use-case: suppose you want to use a pre-trained ML model to predict on 1,000,000 examples.
The model is optimized/vectorized such that it can predict on 10K examples pretty fast, but 1M is slow. In such a case, a setup which works is to load the multiple copies of the model from disk, and then use them to predict on chunks of the 1M examples.
Dask allows you to do this pretty easily and achieve a good speedup:
References:
close
method: https://distributed.dask.org/en/latest/api.html#distributed.Client.closeScheduler
close
method, which from my understanding is what is invoked bycluster.close()
: https://distributed.dask.org/en/latest/scheduling-state.html#distributed.scheduler.Scheduler.closewith
statement having multiple variables: https://stackoverflow.com/a/1073814/4900327