I have a multiprocessing job where I'm queuing read only numpy arrays, as part of a producer consumer pipeline.
Currently they're being pickled, because this is the default behaviour of multiprocessing.Queue
which slows down performance.
Is there any pythonic way to pass references to shared memory instead of pickling the arrays?
Unfortunately the arrays are being generated after the consumer is started, and there is no easy way around that. (So the global variable approach would be ugly...).
[Note that in the following code we are not expecting h(x0) and h(x1) to be computed in parallel. Instead we see h(x0) and g(h(x1)) computed in parallel (like a pipelining in a CPU).]
from multiprocessing import Process, Queue
import numpy as np
class __EndToken(object):
pass
def parrallel_pipeline(buffer_size=50):
def parrallel_pipeline_with_args(f):
def consumer(xs, q):
for x in xs:
q.put(x)
q.put(__EndToken())
def parallel_generator(f_xs):
q = Queue(buffer_size)
consumer_process = Process(target=consumer,args=(f_xs,q,))
consumer_process.start()
while True:
x = q.get()
if isinstance(x, __EndToken):
break
yield x
def f_wrapper(xs):
return parallel_generator(f(xs))
return f_wrapper
return parrallel_pipeline_with_args
@parrallel_pipeline(3)
def f(xs):
for x in xs:
yield x + 1.0
@parrallel_pipeline(3)
def g(xs):
for x in xs:
yield x * 3
@parrallel_pipeline(3)
def h(xs):
for x in xs:
yield x * x
def xs():
for i in range(1000):
yield np.random.uniform(0,1,(500,2000))
if __name__ == "__main__":
rs = f(g(h(xs())))
for r in rs:
print r
Check out the Pathos-multiprocessing project, which avoids the standard
multiprocessing
reliance on pickling. This should allow you to get around both the inefficiencies of pickling, and give you access to common memory for read-only shared resources. Note that while Pathos is nearing deployment in a full pip package, in the interim I'd recommend installing withpip install git+https://github.com/uqfoundation/pathos
Sharing memory between threads or processes
Use threading instead of multiprocessing
Since you're using numpy, you can take advantage of the fact that the global interpreter lock is released during numpy computations. This means you can do parallel processing with standard threads and shared memory, instead of multiprocessing and inter-process communication. Here's a version of your code, tweaked to use threading.Thread and Queue.Queue instead of multiprocessing.Process and multiprocessing.Queue. This passes a numpy ndarray via a queue without pickling it. On my computer, this runs about 3 times faster than your code. (However, it's only about 20% faster than the serial version of your code. I have suggested some other approaches further down.)
Store numpy arrays in shared memory
Another option, close to what you requested, would be to continue using the multiprocessing package, but pass data between processes using arrays stored in shared memory. The code below creates a new ArrayQueue class to do that. The ArrayQueue object should be created before spawning subprocesses. It creates and manages a pool of numpy arrays backed by shared memory. When a result array is pushed onto the queue, ArrayQueue copies the data from that array into an existing shared-memory array, then passes the id of the shared-memory array through the queue. This is much faster than sending the whole array through the queue, since it avoids pickling the arrays. This has similar performance to the threaded version above (about 10% slower), and may scale better if the global interpreter lock is an issue (i.e., you run a lot of python code in the functions).
Parallel processing of samples instead of functions
The code above is only about 20% faster than a single-threaded version (12.2s vs. 14.8s for the serial version shown below). That is because each function is run in a single thread or process, and most of the work is done by xs(). The execution time for the example above is nearly the same as if you just ran
%time print sum(1 for x in xs())
.If your real project has many more intermediate functions and/or they are more complex than the ones you showed, then the workload may be distributed better among processors, and this may not be a problem. However, if your workload really does resemble the code you provided, then you may want to refactor your code to allocate one sample to each thread instead of one function to each thread. That would look like the code below (both threading and multiprocessing versions are shown):
The threaded version of this code is only slightly faster than the first example I gave, and only about 30% faster than the serial version. That's not as much of a speedup as I would have expected; maybe Python is still getting partly bogged down by the GIL?
The multiprocessing version performs significantly faster than your original multiprocessing code, primarily because all the functions get chained together in a single process, rather than queueing (and pickling) intermediate results. However, it is still slower than the serial version because all the result arrays have to get pickled (in the worker process) and unpickled (in the main process) before being returned by imap_unordered. However, if you can arrange it so that your pipeline returns aggregate results instead of the complete arrays, then you can avoid the pickling overhead, and the multiprocessing version is fastest: about 43% faster than the serial version.
OK, now for the sake of completeness, here's a version of the second example that uses multiprocessing with your original generator functions instead of the finer-scale functions shown above. This uses some tricks to spread the samples among multiple processes, which may make it unsuitable for many workflows. But using generators does seem to be slightly faster than using the finer-scale functions, and this method can get you up to a 54% speedup vs. the serial version shown above. However, that is only available if you don't need to return the full arrays from the worker functions.
Your example does not seem to run on my computer, although that may have to do with the fact that I'm running windows (issues pickling anything not in
__main__
namespace (anything decorated))... would something like this help? (you would have to be put pack and unpack inside each of f(), g(), and h())Note* I'm not sure this would actually be any faster... Just a stab at what others have suggested..