我使用multiprocessing.Pool()
并行有些沉重的计算。
目标函数返回大量的数据(一个巨大的名单)。 我跑出来的RAM。
无multiprocessing
,我只是改变目标函数为发电机,通过yield
荷兰国际集团接连产生的元素之一,因为他们计算。
我明白多不支持发生器 - 等待整个输出,并返回它一次,对吗? 没有屈服。 有没有一种方法,使Pool
工人只要他们成为可用的产量数据,无需构建在RAM中的整个结果阵列?
简单的例子:
def target_fnc(arg):
result = []
for i in xrange(1000000):
result.append('dvsdbdfbngd') # <== would like to just use yield!
return result
def process_args(some_args):
pool = Pool(16)
for result in pool.imap_unordered(target_fnc, some_args):
for element in result:
yield element
这是Python 2.7版。
这听起来像一个理想的使用情况下的队列: http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes
只需从汇集的工人养活你的结果到队列中,在主咽下。
请注意,您仍可能会遇到内存压力问题,除非你排排队几乎一样快,工人们正在填充它。 你可以限制队列大小在这种情况下,汇集工人将在阻止(,将适合在队列中的最大对象数) queue.put
语句,直到空间在队列中可用。 这将使在天花板上的内存使用情况。 但是 ,如果你这样做,可能是时候重新考虑是否需要在所有的汇集和/或是否可能是有意义的使用更少的工人。
如果你的任务可以返回成块的数据......它们可以被分解成更小的任务,每个返回单个块? 显然,这是不可能的。 如果实在不行,你必须使用一些其他的机制(如Queue
,如罗兰·艾布拉姆斯建议)。 但是,当它是 ,它可能是其他原因更好的解决方案,以及解决这一问题。
随着你的榜样,这是肯定是可行的。 例如:
def target_fnc(arg, low, high):
result = []
for i in xrange(low, high):
result.append('dvsdbdfbngd') # <== would like to just use yield!
return result
def process_args(some_args):
pool = Pool(16)
pool_args = []
for low in in range(0, 1000000, 10000):
pool_args.extend(args + [low, low+10000] for args in some_args)
for result in pool.imap_unordered(target_fnc, pool_args):
for element in result:
yield element
(你当然可以取代嵌套理解的循环,或zip
和flatten
,如果你喜欢。)
所以,如果some_args
为[1, 2, 3]
,你会得到300任务- [[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …]
其中的每一个仅返回10000个元素,而不是百万。
从你的描述,这听起来像你不是在处理数据这么多的兴趣,因为他们进来,为避免传递百万元list
回。
有这样做的一个简单的方法:只要把数据放到一个文件中。 例如:
def target_fnc(arg):
fd, path = tempfile.mkstemp(text=True)
with os.fdopen(fd) as f:
for i in xrange(1000000):
f.write('dvsdbdfbngd\n')
return path
def process_args(some_args):
pool = Pool(16)
for result in pool.imap_unordered(target_fnc, some_args):
with open(result) as f:
for element in f:
yield element
显然,如果您的结果可能包含换行符,或者不是字符串,等等,你需要使用一个csv
文件, numpy
,而不是等一个简单的文本文件,但这个想法是一样的。
话虽这么说,即使是这样简单,通常有以处理数据,以一定的时间一大块利益,所以在分手的任务,或使用Queue
(其他两个答案建议)可能分别会更好,如果存在缺陷( ,需要一种方式来打破任务了,或者必须能够以最快的速度为他们生产消耗的数据)都没有处理断路器。