Multiprocess in python uses only one process

2019-06-07 23:24发布

问题:

I am trying to learn multiprocessing with python. I wrote a simple code that should feed each process with 1000 lines from a txt input file. My main function reads a line, splits it and then performs some very simple operations with the elements in the string. Eventually the results should be written in an output file.

When I run it, 4 processes are correctly spawned, but only one process is actually running with minimal CPU. As a result the code is very slow and defies the purpose to use multiprocessing in the first place. I think I don't have a global list problem like in this question (python multiprocessing apply_async only uses one process) and I don't think my function is too trivial as in this case (Python multiprocessing.Pool() doesn't use 100% of each CPU).

I can't understand what I'm doing wrong, any help/suggestion is appreciated. Here's the basic code:

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist       

def grouper(n, iterable, padvalue=None):
    return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")   

    p = multiprocessing.Pool(4)

    for chunk in grouper(1000, my_data):
            results = p.map(myfunction, chunk)
            for r in results:
                with open (r"my_output_file","ab") as outfile:
                   outfile.write(r)

EDIT I modified my code following the suggestions (deleting redundant data pre-processing). However, the problem seems to be still there.

import multiprocessing
import itertools

def myfunction(line):
        returnlist=[]
        list_of_elem=line.split(",")
        elem_id=list_of_elem[1]
        elem_to_check=list_of_elem[5]

        ids=list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
                if x[1] == elem_to_check:
                            returnlist.append(",".join([elem_id,x,"1\n"]))
                else:
                            returnlist.append(",".join([elem_id,x,"0\n"]))

        return returnlist       

if __name__ == '__main__':
    my_data = open(r"my_input_file_to_be_processed.txt","r")

    p = multiprocessing.Pool(4)

    results = p.map(myfunction, chunk, chunksize=1000)
        for r in results:
            with open (r"my_output_file","ab") as outfile:
                outfile.write(r)

回答1:

According to your snippet of code I guess I would do something like this to chunk the file in 8 parts and then make the computation to be done by 4 workers (why 8 chunks and 4 workers ? Just a random choice I made for the example.) :

from multiprocessing import Pool
import itertools

def myfunction(lines):
    returnlist = []
    for line in lines:
        list_of_elem = line.split(",")
        elem_id = list_of_elem[1]
        elem_to_check = list_of_elem[5]
        ids = list_of_elem[2].split("|")

        for x in itertools.permutations(ids,2):
            returnlist.append(",".join(
                [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))

    return returnlist

def chunk(it, size):
    it = iter(it)
    return iter(lambda: tuple(itertools.islice(it, size)), ())

if __name__ == "__main__":
    my_data = open(r"my_input_file_to_be_processed.txt","r")
    my_data = my_data.read().split("\n")   

    prep = [strings for strings in chunk(my_data, round(len(my_data) / 8))]
    with Pool(4) as p:
        res = p.map(myfunction, prep)

    result = res.pop(0)
    _ = list(map(lambda x: result.extend(x), res))
    print(result)  # ... or do something with the result

Edit : This is assuming you are confident all lines are formatted in the same way and will cause no error.

According to your comments it might be useful to see what is the problem in your function/the content of your file by testing it without multiprocessing or using try/except in a pretty large/ugly way to be almost sure that an output will be produced (either the exception or the result) :

def myfunction(lines):
    returnlist = []
    for line in lines:
        try:
            list_of_elem = line.split(",")
            elem_id = list_of_elem[1]
            elem_to_check = list_of_elem[5]
            ids = list_of_elem[2].split("|")

            for x in itertools.permutations(ids,2):
                returnlist.append(",".join(
                    [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"]))
        except Exception as err:
            returnlist.append('I encountered error {} on line {}'.format(err, line))

    return returnlist