Can I use functions imported from .py files in Das

2020-06-11 08:07发布

问题:

I have a question about serialization and imports.

  • should functions have their own imports? like I've seen done with PySpark
  • Is the following just plain wrong? Does mod.py need to be a conda/pip package? mod.py was written to a shared filesystem.

In [1]: from distributed import Executor

In [2]: e = Executor('127.0.0.1:8786')

In [3]: e
Out[3]: <Executor: scheduler="127.0.0.1:8786" processes=2 cores=2>

In [4]: import socket

In [5]: e.run(socket.gethostname)
Out[5]: {'172.20.12.7:53405': 'n1015', '172.20.12.8:53779': 'n1016'}

In [6]: %%file mod.py
   ...: def hostname():
   ...:     return 'the hostname'
   ...: 
Overwriting mod.py

In [7]: import mod

In [8]: mod.hostname()
Out[8]: 'the hostname'

In [9]: e.run(mod.hostname)
distributed.utils - ERROR - No module named 'mod'

回答1:

Quick Answer

Upload your mod.py file to all of your workers. You can do this using whatever mechanism you used to set up dask.distributed, or you can use the upload_file method

e.upload_file('mod.py')

Alternatively, if your function is made in IPython, rather than being part of a module, it will be sent along without a problem.

Long Answer

This all has to do with how functions get serialized in Python. Functions from modules are serialized by their module name and function name

In [1]: from math import sin

In [2]: import pickle

In [3]: pickle.dumps(sin)
Out[3]: b'\x80\x03cmath\nsin\nq\x00.'

So if the client machine wants to refer to the math.sin function it sends along this bytestring (which you'll notice has 'math' and 'sin' in it buried among other bytes) to the worker machine. The worker looks at this bytestring and says "OK great, the function I want is in such and such a module, let me go and find that in my local file system. If the module isn't present then it'll raise an error, much like what you received above.

For dynamically created functions (functions that you make in IPython) it uses a completely different approach, bundling up all of the code. This approach generally works fine.

Generally speaking Dask assumes that the workers and the client all have the same software environment. Typically this is mostly handled by whoever sets up your cluster, using some other tool like Docker. Methods like upload_file are there to fill in the gaps when you have files or scripts that get updated more frequently.



回答2:

To run an imported function on your cluster that is not available on the workers' environment, you can also create a local function from the imported function. This local function will then be pickled by cloudpickle. In Python 2 you can achieve this with new.function (see the new module). For Python 3 this could be achieved with the types module, but I haven't tried it.

Your example above would then look like:

In [3]: import mod

In [4]: import new

In [5]: def remote(func):
   ...:     return new.function(func.func_code, func.func_globals, closure=func.func_closure)
   ...:

In [6]: e.run(remote(mod.hostname))
Out[6]: {'tcp://10.0.2.15:44208': 'the hostname'}


回答3:

adding the directory of the module to PYTHONPATH worked for me