I am trying to load a dataset with dask but when it is time to compute my dataset I keep getting problems like this:
WARNING - Worker exceeded 95% memory budget. Restarting.
I am just working on my local machine, initiating dask as follows:
if __name__ == '__main__':
libmarket.config.client = Client() # use dask.distributed by default
Now in my error messages I keep seeing a reference to a 'memory_limit=' keyword parameter. However I've searched the dask documentation thoroughly and I can't figure out how to increase the bloody worker memory-limit in a single-machine configuration. I have 256GB of RAM and I'm removing the majority of the future's columns (a 20GB csv file) before converting it back into a pandas dataframe, so I know it will fit in memory. I just need to increase the per-worker memory limit from my code (not using dask-worker) so that I can process it.
Please, somebody help me.
The argument memory_limit
can be provided to the __init()__
functions of Client
and LocalCluster
.
general remarks
Just calling Client()
is a shortcut for first calling LocalCluster()
and, then, Client
with the created cluster (Dask: Single Machine). When Client
is called without an instance of LocalCluster
, all possible arguments of LocalCluster.__init()__
can be provided to the initialization call of Client
. Therefore, the argument memory_limit
(and other arguments such as n_workers
) are not documented in the API documentation of the Client
class.
However, the argument memory_limit
does not seem to be properly documented in the API documentation of LocalCluster
(see Dask GitHub Issue #4118).
solution
A working example would be the following. I added some more arguments, which might be useful for people finding this question/answer.
# load/import classes
from dask.distributed import Client, LocalCluster
# set up cluster and workers
cluster = LocalCluster(n_workers=4,
threads_per_worker=1,
memory_limit='64GB')
client = Client(cluster)
# have a look at your workers
client
# do some work
## ...
# close workers and cluster
client.close()
cluster.close()
The shortcut would be
# load/import classes
from dask.distributed import Client
# set up cluster and workers
client = Client(n_workers=4,
threads_per_worker=1,
memory_limit='64GB')
# have a look at your workers
client
# do some work
## ...
# close workers and cluster
client.close()
further reading
- https://distributed.dask.org/en/latest/local-cluster.html
- https://github.com/dask/dask/issues/4118