Parallelise nested for-loop in IPython

2019-02-10 06:48发布

问题:

I have a nested for loop in my python code that looks something like this:

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Do various bits of stuff
        # Eventually get a result
        results.append(result)

I'd like to parallelise this loop on my 4 core machine to speed it up. Looking at the IPython parallel programming documentation (http://ipython.org/ipython-doc/dev/parallel/parallel_multiengine.html#quick-and-easy-parallelism) it seems that there is an easy way to use map to parallelise iterative operations.

However, to do that I need to have the code inside the loop as a function (which is easy to do), and then map across this function. The problem I have is that I can't get an array to map this function across. itertools.product() produces an iterator which I can't seem to use the map function with.

Am I barking up the wrong tree by trying to use map here? Is there a better way to do it? Or is there some way to use itertools.product and then do parallel execution with a function mapped across the results?

回答1:

To parallelize every call, you just need to get a list for each argument. You can use itertools.product + zip to get this:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

Then you can use map:

amr = dview.map(f, allzeniths, allazimuths)

To go a bit deeper into the steps, here's an example:

zeniths = range(1,4)
azimuths = range(6,8)

product = list(itertools.product(zeniths, azimuths))
# [(1, 6), (1, 7), (2, 6), (2, 7), (3, 6), (3, 7)]

So we have a "list of pairs", but what we really want is a single list for each argument, i.e. a "pair of lists". This is exactly what the slightly weird zip(*product) syntax gets us:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

print allzeniths
# (1, 1, 2, 2, 3, 3)
print allazimuths
# (6, 7, 6, 7, 6, 7)

Now we just map our function onto those two lists, to parallelize nested for loops:

def f(z,a):
    return z*a

view.map(f, allzeniths, allazimuths)

And there's nothing special about there being only two - this method should extend to an arbitrary number of nested loops.



回答2:

I assume you are using IPython 0.11 or later. First of all define a simple function.

def foo(azimuth, zenith):
    # Do various bits of stuff
    # Eventually get a result
    return result

then use IPython's fine parallel suite to parallelize your problem. first start a controller with 5 engines attached (#CPUs + 1) by starting a cluster in a terminal window (if you installed IPython 0.11 or later this program should be present):

ipcluster start -n 5

In your script connect to the controller and transmit all your tasks. The controller will take care of everything.

from IPython.parallel import Client

c = Client()   # here is where the client establishes the connection
lv = c.load_balanced_view()   # this object represents the engines (workers)

tasks = []
for azimuth in azimuths:
    for zenith in zeniths:
        tasks.append(lv.apply(foo, azimuth, zenith))

result = [task.get() for task in tasks]  # blocks until all results are back


回答3:

I'm not really familiar with IPython, but an easy solution would seem to be to parallelize the outer loop only.

def f(azimuth):
    results = []
    for zenith in zeniths:
        #compute result
        results.append(result)
    return results

allresults = map(f, azimuths)


回答4:

If you actually want to run your code in parallel, use concurrent.futures

import itertools
import concurrent.futures

def _work_horse(azimuth, zenith):
    #DO HEAVY WORK HERE
    return result

futures = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for arg_set in itertools.product(zeniths, azimuths):
        futures.append(executor.submit(_work_horse, *arg_set))
executor.shutdown(wait=True)

# Will time out after one hour.
results = [future.result(3600) for future in futures]


回答5:

If you want to keep the structure of your loop, you can try using Ray (docs), which is a framework for writing parallel and distributed Python. The one requirement is that you would have to separate out the work that can be parallelized into its own function.

You can import Ray like this:

import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init()

Then, your script would look like this:

# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `work` tasks automatically.
@ray.remote
def work(azimuth, zenith):
  # Do various bits of stuff
  # Eventually get a result
  return result

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Store a future, which represents the future result of `work`.
        results.append(work.remote(azimuth, zenith))

# Block until the results are ready with `ray.get`.
results = ray.get(results)