我有包含二进制编码的字符串,我用在单一的功能之前处理,像这样大的列表:
""" just included this to demonstrate the 'data' structure """
data=np.zeros(250,dtype='float32, (250000,2)float32')
def func numpy_array(data, peaks):
rt_counter=0
for x in peaks:
if rt_counter %(len(peaks)/20) == 0:
update_progress()
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[rt_counter][1][peak_counter][0]=float(buff2)
else:
data[rt_counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
rt_counter+=1
我已经在多阅读并计算过,我想尝试,看看我能得到性能上的大幅增加,我改写了我的函数为2(辅助和“主叫方”),如下所示:
def numpy_array(data, peaks):
processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now)
pool = mp.Pool(processes=processors)
chunk_size=len(peaks)/processors
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
pool.map(decode(data,chunk,counter))
def decode(data,chunk,counter):
for x in chunk:
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[counter][1][peak_counter][0]=float(buff2)
else:
data[counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
print data[counter][1][10][0]
counter+=1
该程序运行,但只使用CPU的100-110%(根据上图),一旦它应该完成它抛出TypeError: map() takes at least 3 arguments (2 given)
我,能与多进程多一些经验的人给我一个提示,看出来的(可能引起类型错误)有什么事情? 什么可能导致我的CPU占用率低?
-合并后的答案代码-
def decode((data,chunk,counter)):
print len(chunk), counter
for x in chunk:
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[counter][1][peak_counter][0]=float(buff2)
else:
data[counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
counter+=1
def numpy_array(data, peaks):
"""Fills the NumPy array 'data' with m/z-intensity values acquired
from b64 decoding and unpacking the binary string read from the
mzXML file, which is stored in the list 'peaks'.
The m/z values are assumed to be ordered without validating this
assumption.
Note: This function uses multi-processing
"""
processors=mp.cpu_count()
pool = mp.Pool(processes=processors)
chunk_size=int(len(peaks)/processors)
map_parameters=[]
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
map_parameters.append((data,chunk,counter))
pool.map(decode,map_parameters)
这个最新版本的“作品”到目前为止,它填补了过程中的数组(其中数组包含的值),但一旦所有工序均由内部完成访问数组产量仅仅是因为每个进程得到阵列的本地副本零值。