Script using multiprocessing module does not termi

2019-01-03 07:18发布

The following code, does not print "here". What is the problem? I tested it on both my machines (windows 7, Ubuntu 12.10), and http://www.compileonline.com/execute_python_online.php It does not print "here" in all cases.

from multiprocessing import Queue, Process


def runLang(que):
    print "start"
    myDict=dict()
    for i in xrange(10000):
        myDict[i]=i
    que.put(myDict)
    print "finish"


def run(fileToAnalyze):
    que=Queue()
    processList=[]
    dicList=[]
    langs= ["chi","eng"]
    for lang in langs:
        p=Process(target=runLang,args=(que,))
        processList.append(p)
        p.start()

    for p1 in processList:
        p1.join()

    print "here"

    for _ in xrange(len(langs)):
        item=que.get()
        print item
        dicList.append(item)

if __name__=="__main__":
    processList = []
    for fileToAnalyse in ["abc.txt","def.txt"]:
        p=Process(target=run,args=(fileToAnalyse,))
        processList.append(p)
        p.start()
    for p1 in processList:
        p1.join()

1条回答
Viruses.
2楼-- · 2019-01-03 07:47

This is because when you put lots of items into a multiprocessing.Queue, they eventually get buffered in memory, once the underlying Pipe is full. The buffer won't get flushed until something starts reading from the other end of the Queue, which will allow the Pipe to accept more data. A Process cannot terminate until the buffer for all its Queue instances have been entirely flushed to their underlying Pipe. The implication of this is that if you try to join a process without having another process/thread calling get on its Queue, you could deadlock. This is mentioned in the docs:

Warning

As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue.

You can fix the issue by not calling join until after you empty the Queue in the parent:

for _ in xrange(len(langs)):
    item = que.get()
    print(item)
    dicList.append(item)

# join after emptying the queue.
for p in processList:
    p.join()

print("here")
查看更多
登录 后发表回答