Managing worker memory on a dask localcluster

2020-08-20 06:38发布

问题:

I am trying to load a dataset with dask but when it is time to compute my dataset I keep getting problems like this:

WARNING - Worker exceeded 95% memory budget. Restarting.

I am just working on my local machine, initiating dask as follows:

if __name__ == '__main__':
    libmarket.config.client = Client()  # use dask.distributed by default

Now in my error messages I keep seeing a reference to a 'memory_limit=' keyword parameter. However I've searched the dask documentation thoroughly and I can't figure out how to increase the bloody worker memory-limit in a single-machine configuration. I have 256GB of RAM and I'm removing the majority of the future's columns (a 20GB csv file) before converting it back into a pandas dataframe, so I know it will fit in memory. I just need to increase the per-worker memory limit from my code (not using dask-worker) so that I can process it.

Please, somebody help me.

回答1:

The argument memory_limit can be provided to the __init()__ functions of Client and LocalCluster.

general remarks

Just calling Client() is a shortcut for first calling LocalCluster() and, then, Client with the created cluster (Dask: Single Machine). When Client is called without an instance of LocalCluster, all possible arguments of LocalCluster.__init()__ can be provided to the initialization call of Client. Therefore, the argument memory_limit (and other arguments such as n_workers) are not documented in the API documentation of the Client class.

However, the argument memory_limit does not seem to be properly documented in the API documentation of LocalCluster (see Dask GitHub Issue #4118).

solution

A working example would be the following. I added some more arguments, which might be useful for people finding this question/answer.

# load/import classes
from dask.distributed import Client, LocalCluster

# set up cluster and workers
cluster = LocalCluster(n_workers=4, 
                       threads_per_worker=1,
                       memory_limit='64GB')
client = Client(cluster)

# have a look at your workers
client

# do some work
## ... 

# close workers and cluster
client.close()
cluster.close()

The shortcut would be

# load/import classes
from dask.distributed import Client

# set up cluster and workers
client = Client(n_workers=4, 
                threads_per_worker=1,
                memory_limit='64GB')

# have a look at your workers
client

# do some work
## ... 

# close workers and cluster
client.close()

further reading

  • https://distributed.dask.org/en/latest/local-cluster.html
  • https://github.com/dask/dask/issues/4118