Python Multiprocessing with Distributed Cluster

2020-02-02 07:19发布

问题:

I am looking for a python package that can do multiprocessing not just across different cores within a single computer, but also with a cluster distributed across multiple machines. There are a lot of different python packages for distributed computing, but most seem to require a change in code to run (for example a prefix indicating that the object is on a remote machine). Specifically, I would like something as close as possible to the multiprocessing pool.map function. So, for example, if on a single machine the script is:

from multiprocessing import Pool
pool = Pool(processes = 8)
resultlist = pool.map(function, arglist)

Then the pseudocode for a distributed cluster would be:

from distprocess import Connect, Pool, Cluster

pool1 = Pool(processes = 8)
c = Connect(ipaddress)
pool2 = c.Pool(processes = 4)
cluster = Cluster([pool1, pool2])
resultlist = cluster.map(function, arglist)

回答1:

If you want a very easy solution, there isn't one.

However, there is a solution that has the multiprocessing interface -- pathos -- which has the ability to establish connections to remote servers through a parallel map, and to do multiprocessing.

If you want to have a ssh-tunneled connection, you can do that… or if you are ok with a less secure method, you can do that too.

>>> # establish a ssh tunnel
>>> from pathos.core import connect
>>> tunnel = connect('remote.computer.com', port=1234)
>>> tunnel       
Tunnel('-q -N -L55774:remote.computer.com:1234 remote.computer.com')
>>> tunnel._lport
55774
>>> tunnel._rport
1234
>>> 
>>> # define some function to run in parallel
>>> def sleepy_squared(x):
...   from time import sleep
...   sleep(1.0)
...   return x**2
... 
>>> # build a pool of servers and execute the parallel map
>>> from pathos.pp import ParallelPythonPool as Pool
>>> p = Pool(8, servers=('localhost:55774',))
>>> p.servers
('localhost:55774',)
>>> y = p.map(sleepy_squared, x)
>>> y
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Or, instead you could configure for a direct connection (no ssh)

>>> p = Pool(8, servers=('remote.computer.com:5678',))
# use an asynchronous parallel map
>>> res = p.amap(sleepy_squared, x)
>>> res.get()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

It's all a bit finicky, for the remote server to work, you have to start a server running on remote.computer.com at the specified port beforehand -- and you have to make sure that both the settings on your localhost and the remote host are going to allow either the direct connection or the ssh-tunneled connection. Plus, you need to have the same version of pathos and of the pathos fork of pp running on each host. Also, for ssh, you need to have ssh-agent running to allow password-less login with ssh.

But then, hopefully it all works… if your function code can be transported over to the remote host with dill.source.importable.

FYI, pathos is long overdue a release, and basically, there are a few bugs and interface changes that need to be resolved before a new stable release is cut.



回答2:

I'd suggest taking a look at Ray, which aims to do exactly that.

Ray uses the same syntax to parallelize code in the single machine multicore setting as it does in the distributed setting. If you're willing to use a for loop instead of a map call, then your example would look like the following.

import ray
import time

ray.init()

@ray.remote
def function(x):
    time.sleep(0.1)
    return x

arglist = [1, 2, 3, 4]

result_ids = [function.remote(x) for x in arglist]
resultlist = ray.get(result_ids)

That will run four tasks in parallel using however many cores you have locally. To run the same example on a cluster, the only line that would change would be the call to ray.init(). The relevant documentation can be found here.

Note that I'm helping to develop Ray.



回答3:

A little late to the party here, but since I was also looking for a similar solution, and this question is still not marked as answered, I thought I would contribute my findings.

I ended up using SCOOP. It provides a parallel map implementation that can work across multiple cores, across multiple hosts. It can also fall back to Python's serial map function if desired during invocation.

From SCOOP's introduction page, it cites the following features:

SCOOP features and advantages over futures, multiprocessing and similar modules are as follows:

  • Harness the power of multiple computers over network;
  • Ability to spawn multiple tasks inside a task;
  • API compatible with PEP-3148;
  • Parallelizing serial code with only minor modifications;
  • Efficient load-balancing.

It does have some quirks (functions/classes must be pickleable), and the setup to get things running smoothly across multiple hosts can be tedious if they don't all share the same filesystem schema, but overall I'm quite happy with the results. For our purposes, doing quite a bit of Numpy & Cython, it provides excellent performance.

Hope this helps.



回答4:

Have you looked to disco?

Features:

  • Map / Reduce paradigm
  • Python programming
  • Distributed shared disk
  • ssh underlaying transport
  • web and console interfaces
  • easy to add/block/delete a node
  • master launch slaves nodes without user intervention
  • slaves nodes are automatically restarted in case of failure
  • nice documentation. Following the Install Guide I was able to launch a 2-machine cluster in a few minutes (the only thing I need to do was creating $DISCO_HOME/root folder in order to connect to the WebUI, I guess due of log file error creation).

A simple example from disco's documentation:

from disco.core import Job, result_iterator

def map(line, params):
    for word in line.split():
        yield word, 1

def reduce(iter, params):
    from disco.util import kvgroup
    for word, counts in kvgroup(sorted(iter)):
        yield word, sum(counts)

if __name__ == '__main__':
    job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
                    map=map,
                    reduce=reduce)
    for word, count in result_iterator(job.wait(show=True)):
        print(word, count)