Multiprocessing.Pool使得numpy的矩阵乘法慢(Multiprocessing.

2019-07-21 14:56发布

所以,我玩弄multiprocessing.PoolNumpy ,但似乎我错过了一些重要的一点。 为什么pool版本慢很多? 我看着htop ,我可以看到创建多个进程,但他们的CPU中的所有共享一个加起来〜100%。

$ cat test_multi.py 
import numpy as np
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':
    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    pool = Pool(8)
    print timeit(lambda: map(mmul, matrices), number=20)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

$ python test_multi.py 
16.0265390873
19.097837925

[更新]

  • 改为timeit进行基准测试过程
  • INIT池与我的一些核心
  • 变化的计算,以便有更多的计算和更少的内存传输(我希望)

仍然没有改变。 pool版本仍然是慢,我可以看到htop只有一个核心使用也有几个进程产生了。

[UPDATE2]

在我读有关@扬菲利普Gehrcke的建议,使用的那一刻multiprocessing.Process()Queue 。 不过在此之前,我想知道:

  1. 为什么我的工作,例如为蒂亚戈? 可能是什么,这不是我的机器上工作的原因1 ?
  2. 在我的示例代码的过程之间的复制? 我打算我的代码给每个线程的矩阵列表中的一个矩阵。
  3. 是我的代码了一个坏榜样,因为我使用Numpy

我了解到,经常得到一个更好的答案,当别人知道我的最终目标是这样:我有很多的文件,这些文件大气压加载和处理以串行方式的。 该处理是CPU激烈,所以我想多可以通过并行化获得。 我的目标是它调用分析并行文件蟒蛇功能。 此外,该功能仅仅是C代码的接口,我假定,有差别。

1 Ubuntu的12.04,Python的2.7.3,酷睿i7 860 @ 2.80 -请,如果你需要更多的信息发表评论。

[UPDATE3]

以下是斯蒂法诺的示例代码的结果。 出于某种原因,没有加快。 :/

testing with 16 matrices
base  4.27
   1  5.07
   2  4.76
   4  4.71
   8  4.78
  16  4.79
testing with 32 matrices
base  8.82
   1 10.39
   2 10.58
   4 10.73
   8  9.46
  16  9.54
testing with 64 matrices
base 17.38
   1 19.34
   2 19.62
   4 19.59
   8 19.39
  16 19.34

[更新4]答案扬-菲利普Gehrcke的评论

对不起,我说得不清晰。 正如我在写更新2我的主要目标是并行第三方Python库函数的许多串行电话。 这个功能是一些C代码的接口。 我建议使用Pool ,但没有工作,所以我想简单的东西,例如上面所示numpy 。 但也有我无法实现的性能提升,即使它看起来对我来说“emberassing parallelizable`。 所以,我想我一定是错过了一些重要的。 这个信息是我要寻找这个问题和赏金。

[更新5]

感谢您的所有巨大投入。 但通过你的答案阅读只会造成我更多的问题。 出于这个原因,我将读到的基础知识 ,创造新的做题时,我有什么,我不知道一个更清晰的认识。

Answer 1:

关于你的所有流程都是一样的CPU上运行的事实, 看到我的答案在这里 。

在导入过程中, numpy改变父进程的CPU亲和力,这样当您以后使用Pool的所有工作进程,它会生成最终会争夺了相同的核心,而不是用你所有的设备上可用的核心。

您可以拨打taskset导入后numpy重新设置CPU的亲和力,使所有内核使用:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool


def mmul(matrix):
    for i in range(100):
        matrix = matrix * matrix
    return matrix

if __name__ == '__main__':

    matrices = []
    for i in range(4):
        matrices.append(np.random.random_integers(100, size=(1000, 1000)))

    print timeit(lambda: map(mmul, matrices), number=20)

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all cores
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)
    print timeit(lambda: pool.map(mmul, matrices), number=20)

输出:

    $ python tmp.py                                     
    12.4765810966
    pid 29150's current affinity mask: 1
    pid 29150's new affinity mask: ff
    13.4136221409

如果您在使用观看CPU使用率top ,而你运行该脚本,您应该使用它所有的核心,当它执行“平行”部分看到。 正如其他人所指出的,在你原来的例子涉及酸洗数据的开销,进程创建等可能超过从并行化任何可能的益处。

编辑:我怀疑,为什么单过程似乎是一贯的快的部分原因是, numpy可能加快,当作业在多内核传播它不能使用逐元素矩阵乘法一些技巧。

例如,如果我只是使用普通的Python列表计算斐波纳契数列,我可以从并行化一个巨大的加速。 同样,如果我在那个需要量化的没有优势的方式做逐元素相乘,我得到了并行版本类似的加速:

import numpy as np
import os
from timeit import timeit
from multiprocessing import Pool

def fib(dummy):
    n = [1,1]
    for ii in xrange(100000):
        n.append(n[-1]+n[-2])

def silly_mult(matrix):
    for row in matrix:
        for val in row:
            val * val

if __name__ == '__main__':

    dt = timeit(lambda: map(fib, xrange(10)), number=10)
    print "Fibonacci, non-parallel: %.3f" %dt

    matrices = [np.random.randn(1000,1000) for ii in xrange(10)]
    dt = timeit(lambda: map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, non-parallel: %.3f" %dt

    # after importing numpy, reset the CPU affinity of the parent process so
    # that it will use all CPUS
    os.system("taskset -p 0xff %d" % os.getpid())

    pool = Pool(8)

    dt = timeit(lambda: pool.map(fib,xrange(10)), number=10)
    print "Fibonacci, parallel: %.3f" %dt

    dt = timeit(lambda: pool.map(silly_mult, matrices), number=10)
    print "Silly matrix multiplication, parallel: %.3f" %dt

输出:

$ python tmp.py
Fibonacci, non-parallel: 32.449
Silly matrix multiplication, non-parallel: 40.084
pid 29528's current affinity mask: 1
pid 29528's new affinity mask: ff
Fibonacci, parallel: 9.462
Silly matrix multiplication, parallel: 12.163


Answer 2:

通信开销和计算加速之间的不可预知的竞争肯定是这里的问题。 你所观察是完全没有问题。 无论你得到的净加速取决于多种因素,是一些必须正确量化(像你一样)。

那么,为什么是multiprocessing你的情况如此“意外慢”? multiprocessingmapmap_async功能实际上咸菜Python对象来回通过与子进程连接父管道。 这可能需要相当长的时间。 在那段时间里,子进程几乎无关,这就是在看htop 。 不同系统之间,有可能是一个相当大的管道输送性能差,这也是为什么有些人你的游泳池的代码比你的单CPU代码快,但对你来说是不是(其它因素可能出现在这里发挥作用,这仅仅是为了一个例子来解释的效果)。

你能做些什么,以使其更快?

  1. 不要咸菜的POSIX兼容的系统输入。

    如果你是在Unix上,你可以通过服用POSIX”进程fork行为优势得到各地父 - >子通信开销(复印件上写存储器):

    创建你的工作输入(如大型矩阵列表)上的父进程中一个全局可访问的变量 。 然后通过调用创建工作进程multiprocessing.Process()自己。 在孩子们了,从全局变量输入的作业。 简单地表示,这使孩子获得父母的记忆,没有任何通信开销(*,下面的解释)。 发送结果回父,通过如multiprocessing.Queue 。 这将节省大量的通信开销,尤其是如果相对于输入输出很小。 这种方法不会对如Windows工作,因为multiprocessing.Process()也创建了一个全新的Python进程不继承父的状态。

  2. 利用多线程numpy的的。 根据您的实际计算任务时,可能会发生涉及multiprocessing将于事无补。 如果您编译NumPy的自己,并启用OpenMP指令,则操作上拉尔矩阵可能会变得非常高效多线程(和分布在多个CPU内核; GIL的是在这里没有限制因素)本身。 基本上,这是多CPU内核可以在numpy的/ SciPy的背景下得到最有效的利用。

*孩子无法直接访问父进程的内存一般。 然而,后fork()父母和孩子都在同等状态。 这将是愚蠢的父的整个存储器复制到RAM中另一个地方。 这就是为什么写入时复制原理跳跃。只要孩子不改变其记忆状态,它实际上访问的父进程的内存。 仅在修改例中,对应的星星点点被复制到儿童的存储器空间。

主要修改:

让我加入一段代码,仰卧起坐大量与多个工作进程的输入数据和遵循的意见“1.不要咸菜的POSIX兼容的系统输入。” 此外,信息量调回工人管理器(父进程)是相当低的。 本实施例的重计算部分是一个单值分解。 它可以使大量使用的OpenMP的。 我已经执行的例子多次:

  • 一旦用1,2或4的工作进程和OMP_NUM_THREADS=1 ,所以每个工作进程创建的100%的最大负荷。 在那里,上述数 - 的 - 工人 - 计算时间缩放行为几乎是线性的,净加速系数高达相当于员工参与的数量。
  • 一旦用1,2或4的工作进程和OMP_NUM_THREADS=4 ,从而每个进程创建的400%的最大负荷(通过产卵4个OpenMP的线程)。 我的机器有16个真正的内核,因此4个过程,每个几乎得到最大的性能从机器中最大400%的负载。 缩放是不完美的线性再和加速比是不参与的工人数量,但绝对计算时间变显著相比减小OMP_NUM_THREADS=1和时间仍与工作进程的数量显著降低。
  • 一旦与较大的输入数据,4个内核,和OMP_NUM_THREADS=4 。 它导致的1253%的平均系统负载。
  • 一旦有相同的设置最后,但OMP_NUM_THREADS=5 。 它导致的1598%的平均系统负载,这表明我们得到了一切事情,从16芯机。 不过,相比于后者的情况下,实际计算挂钟时间没有改善。

代码:

import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing


# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.


MATRIX_SIZE = 1000
MATRIX_COUNT = 16


def rnd_matrix():
    offset = np.random.randint(1,10)
    stretch = 2*np.random.rand()+0.1
    return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)


print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]


def worker_function(result_queue, worker_index, chunk_boundary):
    """Work on a certain chunk of the globally defined `INPUT` list.
    """
    result_chunk = []
    for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
        # Perform single value decomposition (CPU intense).
        u, s, v = svd(m)
        # Build single numeric value as output.
        output =  int(np.sum(s))
        result_chunk.append(output)
    result_queue.put((worker_index, result_chunk))


def work(n_workers=1):
    def calc_chunksize(l, n):
        """Rudimentary function to calculate the size of chunks for equal 
        distribution of a list `l` among `n` workers.
        """
        return int(math.ceil(len(l)/float(n)))

    # Build boundaries (indices for slicing) for chunks of `INPUT` list.
    chunk_size = calc_chunksize(INPUT, n_workers)
    chunk_boundaries = [
        (i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]

    # When n_workers and input list size are of same order of magnitude,
    # the above method might have created less chunks than workers available. 
    if n_workers != len(chunk_boundaries):
        return None

    result_queue = multiprocessing.Queue()
    # Prepare child processes.
    children = []
    for worker_index in xrange(n_workers):
        children.append(
            multiprocessing.Process(
                target=worker_function,
                args=(
                    result_queue,
                    worker_index,
                    chunk_boundaries[worker_index],
                    )
                )
            )

    # Run child processes.
    for c in children:
        c.start()

    # Create result list of length of `INPUT`. Assign results upon arrival.
    results = [None] * len(INPUT)

    # Wait for all results to arrive.
    for _ in xrange(n_workers):
        worker_index, result_chunk = result_queue.get(block=True)
        chunk_boundary = chunk_boundaries[worker_index]
        # Store the chunk of results just received to the overall result list.
        results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk

    # Join child processes (clean up zombies).
    for c in children:
        c.join()
    return results


def main():
    durations = []
    n_children = [1, 2, 4]
    for n in n_children:
        print "Crunching input with %s child(ren)." % n
        t0 = time.time()
        result = work(n)
        if result is None:
            continue
        duration = time.time() - t0
        print "Result computed by %s child process(es): %s" % (n, result)
        print "Duration: %.2f s" % duration
        durations.append(duration)
    normalized_durations = [durations[0]/d for d in durations]
    for n, normdur in zip(n_children, normalized_durations):
        print "%s-children speedup: %.2f" % (n, normdur)


if __name__ == '__main__':
    main()

输出:

$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps

$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps

$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py 
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps


Answer 3:

你的代码是正确的。 我只是跑了我的系统(含2个核,超线程),并得到以下结果:

$ python test_multi.py 
30.8623809814
19.3914041519

我看了看过程,正如预期的那样,平行部分显示多个进程接近100%的工作。 这必须是在东西你的系统或Python安装。



Answer 4:

测量运算吞吐量是一个非常艰巨的任务:基本上你的测试用例是太简单了,我看到了很多问题。

首先,你要测试整数运算:有没有特别的原因吗? 浮点你得到的结果是在许多不同的架构相媲美。

第二matrix = matrix*matrix覆盖输入参数(矩阵用ref和未通过由值),并且每个样本具有在不同的数据工作...

最后的测试应在更广泛的问题,规模和职工人数的情况下进行,以把握总体趋势。

因此,这里是我的修改测试脚本

import numpy as np
from timeit import timeit
from multiprocessing import Pool

def mmul(matrix):
    mymatrix = matrix.copy()
    for i in range(100):
        mymatrix *= mymatrix
    return mymatrix

if __name__ == '__main__':

    for n in (16, 32, 64):
        matrices = []
        for i in range(n):
            matrices.append(np.random.random_sample(size=(1000, 1000)))

        stmt = 'from __main__ import mmul, matrices'
        print 'testing with', n, 'matrices'
        print 'base',
        print '%5.2f' % timeit('r = map(mmul, matrices)', setup=stmt, number=1)

        stmt = 'from __main__ import mmul, matrices, pool'
        for i in (1, 2, 4, 8, 16):
            pool = Pool(i)
            print "%4d" % i, 
            print '%5.2f' % timeit('r = pool.map(mmul, matrices)', setup=stmt, number=1)
            pool.close()
            pool.join()

和我的结果:

$ python test_multi.py 
testing with 16 matrices
base  5.77
   1  6.72
   2  3.64
   4  3.41
   8  2.58
  16  2.47
testing with 32 matrices
base 11.69
   1 11.87
   2  9.15
   4  5.48
   8  4.68
  16  3.81
testing with 64 matrices
base 22.36
   1 25.65
   2 15.60
   4 12.20
   8  9.28
  16  9.04

[更新]我在不同的电脑上在家中运行这个例子,获得一致的放缓:

testing with 16 matrices
base  2.42
   1  2.99
   2  2.64
   4  2.80
   8  2.90
  16  2.93
testing with 32 matrices
base  4.77
   1  6.01
   2  5.38
   4  5.76
   8  6.02
  16  6.03
testing with 64 matrices
base  9.92
   1 12.41
   2 10.64
   4 11.03
   8 11.55
  16 11.59

我不得不承认,我不知道是谁的错(numpy的,蟒蛇,编译器,内核)...



Answer 5:

默认情况下, Pool只用电量的过程,其中n为您的机器上的CPU数量。 你需要指定你想要多少进程其使用,如Pool(5)

在这里看到更多信息



Answer 6:

既然你提到,你有很多的文件,我建议以下解决方案;

  • 做文件名列表。
  • 编写加载和处理命名为输入参数的单个文件的功能。
  • 使用Pool.map()的函数应用到文件列表。

由于每个实例现在加载自己的文件,各地传递的唯一数据是文件名,而不是(潜在的巨大)numpy的阵列。



Answer 7:

我还注意到,当我跑了Pool.map()函数中numpy的矩阵乘法,它运行在特定机器上要慢得多。 我的目标是使用Pool.map()并行我的工作,我的机器上的每个内核运行一个进程。 当事情快速运行时,numpy的矩阵乘法只是在并行执行的总体工作的一小部分。 当我看到的处理的CPU使用率,我可以看到,每个过程可以在那里它跑慢的机器使用例如400 +%的CPU,但总是<= 100%,其中它跑得快的机器。 对我来说,解决办法是从多线程停止numpy的 。 事实证明,numpy的成立了多线程的确切位置我Pool.map()的运行速度慢的机器。 显然,如果你已经使用Pool.map()并行,然后有numpy的也只是并行创建干扰。 我只是叫export MKL_NUM_THREADS=1运行我的Python代码之前,它无处不在的工作快。



文章来源: Multiprocessing.Pool makes Numpy matrix multiplication slower