Consider the following script in which I test two ways of performing some calculations on generators obtained by itertools.tee
:
#!/usr/bin/env python3
from sys import argv
from itertools import tee
from multiprocessing import Process
def my_generator():
for i in range(5):
print(i)
yield i
def double(x):
return 2 * x
def compute_double_sum(iterable):
s = sum(map(double, iterable))
print(s)
def square(x):
return x * x
def compute_square_sum(iterable):
s = sum(map(square, iterable))
print(s)
g1, g2 = tee(my_generator(), 2)
try:
processing_type = argv[1]
except IndexError:
processing_type = "no_multi"
if processing_type == "multi":
p1 = Process(target=compute_double_sum, args=(g1,))
p2 = Process(target=compute_square_sum, args=(g2,))
print("p1 starts")
p1.start()
print("p2 starts")
p2.start()
p1.join()
print("p1 finished")
p2.join()
print("p2 finished")
else:
compute_double_sum(g1)
compute_square_sum(g2)
Here is what I obtain when I run the script in "normal" mode:
$ ./test_tee.py
0
1
2
3
4
20
30
And here in parallel mode:
$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished
The initial generator is apparently somehow "copied", and executed twice.
I would like to avoid this because in my real application, this seems to induce a bug in one of the external libraries I'm using to make the initial generator (https://github.com/pysam-developers/pysam/issues/397), and still be able to do computations in parallel on the same generated values.
Is there a way to achieve what I want ?
The
multiprocessing
system imports your main module into each process it starts. Therefore the module code is executed in each process.You can avoid this by using the always-recommended
after your class and function definitions, so the code for the main program only runs in the starting process. This is supposed to be a requirement only for Windows platforms, but it might be worth a try since you are complaining that code is being run twice.
I found some alternative way of doing here: https://stackoverflow.com/a/26873783/1878788.
In this approach we don't tee the generator any more. We just duplicate its generated items and feed them to a composite function that does the parallel treatment on the generated items within one process only, but we take advantage of multiprocessing by using a
Pool
(is this what is called a map/reduce approach?):This works on the toy example. I now have to figure out how to similarly organize my computations in the real application case.
I tried to generalize this using a higher order function (
make_funcs_applier
) to generate the composite function (apply_funcs
), but I get the following error:A more generalized attempt
Based on a suggestion in the comments, I tried to improve the above solution to be more re-usable:
Testing it:
There are still some annoying limitations for me because I tend to often define local functions in my code.