How to run generator code in parallel?

2019-06-25 00:16发布

问题:

I have code like this:

def generator():
    while True:
        # do slow calculation
        yield x

I would like to move the slow calculation to separate process(es).

I'm working in python 3.6 so I have concurrent.futures.ProcessPoolExecutor. It's just not obvious how to concurrent-ize a generator using that.

The differences from a regular concurrent scenario using map is that there is nothing to map here (the generator runs forever), and we don't want all the results at once, we want to queue them up and wait until the queue is not full before calculating more results.

I don't have to use concurrent, multiprocessing is fine also. It's a similar problem, it's not obvious how to use that inside a generator.

Slight twist: each value returned by the generator is a large numpy array (10 megabytes or so). How do I transfer that without pickling and unpickling? I've seen the docs for multiprocessing.Array but it's not totally obvious how to transfer a numpy array using that.

回答1:

In this type of situation I usually use the joblib library. It is a parallel computation framework based on multiprocessing. It supports memmapping precisely for the cases where you have to handle large numpy arrays. I believe it is worth checking for you.

Maybe joblib's documentation is not explicit enough on this point, showing only examples with for loops, since you want to use a generator I should point out that it indeed works with generators. An example that would achieve what you want is the following:

from joblib import Parallel, delayed
def my_long_running_job(x):
    # do something with x
# you can customize the number of jobs
Parallel(n_jobs=4)(delayed(my_long_running_job)(x) for x in generator())

Edit: I don't know what kind of processing you want to do, but if it releases the GIL you could also consider using threads. This way you won't have the problem of having to transfer large numpy arrays between processes, and still beneficiate from true parallelism.