大数据多重处理(multiprocessing with large data)

2019-07-19 05:04发布

我使用multiprocessing.Pool()并行有些沉重的计算。

目标函数返回大量的数据(一个巨大的名单)。 我跑出来的RAM。

multiprocessing ,我只是改变目标函数为发电机,通过yield荷兰国际集团接连产生的元素之一,因为他们计算。

我明白多不支持发生器 - 等待整个输出,并返回它一次,对吗? 没有屈服。 有没有一种方法,使Pool工人只要他们成为可用的产量数据,无需构建在RAM中的整个结果阵列?

简单的例子:

def target_fnc(arg):
   result = []
   for i in xrange(1000000):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        for element in result:
            yield element

这是Python 2.7版。

Answer 1:

这听起来像一个理想的使用情况下的队列: http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

只需从汇集的工人养活你的结果到队列中,在主咽下。

请注意,您仍可能会遇到内存压力问题,除非你排排队几乎一样快,工人们正在填充它。 你可以限制队列大小在这种情况下,汇集工人将在阻止(,将适合在队列中的最大对象数) queue.put语句,直到空间在队列中可用。 这将使在天花板上的内存使用情况。 但是 ,如果你这样做,可能是时候重新考虑是否需要在所有的汇集和/或是否可能是有意义的使用更少的工人。



Answer 2:

如果你的任务可以返回成块的数据......它们可以被分解成更小的任务,每个返回单个块? 显然,这是不可能的。 如果实在不行,你必须使用一些其他的机制(如Queue ,如罗兰·艾布拉姆斯建议)。 但是,当它 ,它可能是其他原因更好的解决方案,以及解决这一问题。

随着你的榜样,这是肯定是可行的。 例如:

def target_fnc(arg, low, high):
   result = []
   for i in xrange(low, high):
       result.append('dvsdbdfbngd') # <== would like to just use yield!
   return result

def process_args(some_args):
    pool = Pool(16)
    pool_args = []
    for low in in range(0, 1000000, 10000):
        pool_args.extend(args + [low, low+10000] for args in some_args)
    for result in pool.imap_unordered(target_fnc, pool_args):
        for element in result:
            yield element

(你当然可以取代嵌套理解的循环,或zipflatten ,如果你喜欢。)

所以,如果some_args[1, 2, 3] ,你会得到300任务- [[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …]其中的每一个仅返回10000个元素,而不是百万。



Answer 3:

从你的描述,这听起来像你不是在处理数据这么多的兴趣,因为他们进来,为避免传递百万元list回。

有这样做的一个简单的方法:只要把数据放到一个文件中。 例如:

def target_fnc(arg):
    fd, path = tempfile.mkstemp(text=True)
    with os.fdopen(fd) as f:
        for i in xrange(1000000):
            f.write('dvsdbdfbngd\n')
    return path

def process_args(some_args):
    pool = Pool(16)
    for result in pool.imap_unordered(target_fnc, some_args):
        with open(result) as f:
            for element in f:
                yield element

显然,如果您的结果可能包含换行符,或者不是字符串,等等,你需要使用一个csv文件, numpy ,而不是等一个简单的文本文件,但这个想法是一样的。

话虽这么说,即使是这样简单,通常有以处理数据,以一定的时间一大块利益,所以在分手的任务,或使用Queue (其他两个答案建议)可能分别会更好,如果存在缺陷( ,需要一种方式来打破任务了,或者必须能够以最快的速度为他们生产消耗的数据)都没有处理断路器。



文章来源: multiprocessing with large data