Python的多处理(Python multi-processing)

2019-08-22 05:24发布

我有包含二进制编码的字符串,我用在单一的功能之前处理,像这样大的列表:

""" just included this to demonstrate the 'data' structure """
data=np.zeros(250,dtype='float32, (250000,2)float32')

def func numpy_array(data, peaks):
rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

我已经在多阅读并计算过,我想尝试,看看我能得到性能上的大幅增加,我改写了我的函数为2(辅助和“主叫方”),如下所示:

def numpy_array(data, peaks):
    processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now)
    pool = mp.Pool(processes=processors)
    chunk_size=len(peaks)/processors
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        pool.map(decode(data,chunk,counter))

def decode(data,chunk,counter):
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1      

该程序运行,但只使用CPU的100-110%(根据上图),一旦它应该完成它抛出TypeError: map() takes at least 3 arguments (2 given)我,能与多进程多一些经验的人给我一个提示,看出来的(可能引起类型错误)有什么事情? 什么可能导致我的CPU占用率低?

-合并后的答案代码-

def decode((data,chunk,counter)):
    print len(chunk), counter
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

def numpy_array(data, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors=mp.cpu_count()
    pool = mp.Pool(processes=processors)
    chunk_size=int(len(peaks)/processors)
    map_parameters=[]
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter))
    pool.map(decode,map_parameters) 

这个最新版本的“作品”到目前为止,它填补了过程中的数组(其中数组包含的值),但一旦所有工序均由内部完成访问数组产量仅仅是因为每个进程得到阵列的本地副本零值。

Answer 1:

像这样的东西应该工作

需要注意的是pool.map需要一个功能,该功能为每个调用的参数列表。 在原来的例子中,你只是调用它在numpy_array功能。

该功能只能有一个参数,自变量,因此包装成一个元组,并在相当奇怪的看着双括号decode (被称为元组拆包)。

def numpy_array(data, peaks):
    processors=4
    pool = mp.Pool(processes=processors)
    chunk_size=len(data)/processors
    print range(processors)
    map_parameters = [] # new
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter)) # new
    pool.map(decode, map_parameters) # new

def decode((data,chunk,counter)): # changed
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1


Answer 2:

该缺陷是在你的numpy_array功能:

for i in range(processors):
    counter = i*chunk_size
    chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
    pool.map(decode(data,chunk,counter))

问题是,我们在调用map顺序所以你一次只能运行一个进程。 另外,我不认为你调用map正确地为你做pool.map(f(*args))当签名map(f, ['list', 'of', 'data'])

我会用一个部分,让你不创建副本data ,因为我认为数组是相当大的或可能在未来更大。

这应该是:

import functools
decode_with_data = functools.partial(decode, data)
args = []
for i in range(processors):
    counter = i * chunk_size
    chunk = peaks[1*chunk_size:(i+1)*chunk_size-1]
    args.append(chunk, counter)
pool.map(decode_with_data, args)


文章来源: Python multi-processing