I am trying to learn multiprocessing with python. I wrote a simple code that should feed each process with 1000 lines from a txt input file. My main function reads a line, splits it and then performs some very simple operations with the elements in the string. Eventually the results should be written in an output file.
When I run it, 4 processes are correctly spawned, but only one process is actually running with minimal CPU. As a result the code is very slow and defies the purpose to use multiprocessing in the first place. I think I don't have a global list problem like in this question (python multiprocessing apply_async only uses one process) and I don't think my function is too trivial as in this case (Python multiprocessing.Pool() doesn't use 100% of each CPU).
I can't understand what I'm doing wrong, any help/suggestion is appreciated. Here's the basic code:
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
def grouper(n, iterable, padvalue=None):
return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
my_data = my_data.read().split("\n")
p = multiprocessing.Pool(4)
for chunk in grouper(1000, my_data):
results = p.map(myfunction, chunk)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
EDIT I modified my code following the suggestions (deleting redundant data pre-processing). However, the problem seems to be still there.
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
p = multiprocessing.Pool(4)
results = p.map(myfunction, chunk, chunksize=1000)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)