Consider the following script in which I test two ways of performing some calculations on generators obtained by itertools.tee
:
#!/usr/bin/env python3
from sys import argv
from itertools import tee
from multiprocessing import Process
def my_generator():
for i in range(5):
print(i)
yield i
def double(x):
return 2 * x
def compute_double_sum(iterable):
s = sum(map(double, iterable))
print(s)
def square(x):
return x * x
def compute_square_sum(iterable):
s = sum(map(square, iterable))
print(s)
g1, g2 = tee(my_generator(), 2)
try:
processing_type = argv[1]
except IndexError:
processing_type = "no_multi"
if processing_type == "multi":
p1 = Process(target=compute_double_sum, args=(g1,))
p2 = Process(target=compute_square_sum, args=(g2,))
print("p1 starts")
p1.start()
print("p2 starts")
p2.start()
p1.join()
print("p1 finished")
p2.join()
print("p2 finished")
else:
compute_double_sum(g1)
compute_square_sum(g2)
Here is what I obtain when I run the script in "normal" mode:
$ ./test_tee.py
0
1
2
3
4
20
30
And here in parallel mode:
$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished
The initial generator is apparently somehow "copied", and executed twice.
I would like to avoid this because in my real application, this seems to induce a bug in one of the external libraries I'm using to make the initial generator (https://github.com/pysam-developers/pysam/issues/397), and still be able to do computations in parallel on the same generated values.
Is there a way to achieve what I want ?
I found some alternative way of doing here: https://stackoverflow.com/a/26873783/1878788.
In this approach we don't tee the generator any more. We just duplicate its generated items and feed them to a composite function that does the parallel treatment on the generated items within one process only, but we take advantage of multiprocessing by using a Pool
(is this what is called a map/reduce approach?):
#!/usr/bin/env python3
from itertools import starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
def my_generator():
for i in range(5):
print(i)
yield i
def double(x):
return 2 * x
def square(x):
return x * x
def double_and_square(args_list):
return (double(*args_list[0]), square(*args_list[1]))
def sum_tuples(tup1, tup2):
return tuple(starmap(add, zip(tup1, tup2)))
with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator()))
print(reduce(sum_tuples, results_generator))
This works on the toy example. I now have to figure out how to similarly organize my computations in the real application case.
I tried to generalize this using a higher order function (make_funcs_applier
) to generate the composite function (apply_funcs
), but I get the following error:
AttributeError: Can't pickle local object 'make_funcs_applier.<locals>.apply_funcs'
A more generalized attempt
Based on a suggestion in the comments, I tried to improve the above solution to be more re-usable:
#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""
from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
# def tuple_func(args_list):
# return tuple(func(args) for func, args in zip(funcs, args_list))
# return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))
class FuncApplier(object):
"""This kind of object can be used to group functions and call them on a
tuple of arguments."""
__slots__ = ("funcs", )
def __init__(self, funcs):
self.funcs = funcs
def __len__(self):
return len(self.funcs)
def __call__(self, args_list):
return tuple(func(args) for func, args in zip(self.funcs, args_list))
def fork_args(self, args_list):
"""Takes an arguments list and repeat them in a n-tuple."""
return tuple(repeat(args_list, len(self)))
def sum_tuples(*tuples):
"""Element-wise sum of tuple items."""
return tuple(starmap(add, zip(*tuples)))
# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
return x + 1
def double(x):
return 2 * x
def square(x):
return x * x
def main():
def my_generator():
for i in range(5):
print(i)
yield i
test_tuple_func = FuncApplier((plus_one, double, square))
with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(
test_tuple_func,
(test_tuple_func.fork_args(args_list) for args_list in my_generator()))
print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
sum_tuples, results_generator))
exit(0)
if __name__ == "__main__":
exit(main())
Testing it:
$ ./test_fork.py
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30
There are still some annoying limitations for me because I tend to often define local functions in my code.
The multiprocessing
system imports your main module into each process it starts. Therefore the module code is executed in each process.
You can avoid this by using the always-recommended
if __name__ == '__main__':
after your class and function definitions, so the code for the main program only runs in the starting process. This is supposed to be a requirement only for Windows platforms, but it might be worth a try since you are complaining that code is being run twice.