AttributeError 'DupFd' in 'multiproces

2020-04-21 07:03发布

问题:

I'm trying to communicate between multiple threading.Thread(s) doing I/O-bound tasks and multiple multiprocessing.Process(es) doing CPU-bound tasks. Whenever a thread finds work for a process, it will be put on a multiprocessing.Queue, together with the sending end of a multiprocessing.Pipe(duplex=False). The processes then do their part and send results back to the threads via the Pipe. This procedure seems to work in roughly 70% of the cases, the other 30% I receive an AttributeError: Can't get attribute 'DupFd' on <module 'multiprocessing.resource_sharer' from '/usr/lib/python3.5/multiprocessing/resource_sharer.py'>

To reproduce:

import multiprocessing
import threading
import time

def thread_work(work_queue, pipe):
    while True:
        work_queue.put((threading.current_thread().name,  pipe[1]))
        received = pipe[0].recv()
        print("{}: {}".format(threading.current_thread().name, threading.current_thread().name == received))
        time.sleep(0.3)

def process_work(work_queue):
    while True:
        thread, pipe = work_queue.get()
        pipe.send(thread)

work_queue = multiprocessing.Queue()
for i in range(0,3):
    receive, send = multiprocessing.Pipe(duplex=False)
    t = threading.Thread(target=thread_work, args=[work_queue, (receive, send)])
    t.daemon = True
    t.start()

for i in range(0,2):
    p = multiprocessing.Process(target=process_work, args=[work_queue])
    p.daemon = True
    p.start()

time.sleep(5)

I had a look in the multiprocessing source code, but couldn't understand why this error occurs. I tried using the queue.Queue, or a Pipe with duplex=True (default) but coudn't find a pattern in the error. Does anyone have a clue how to debug this?

回答1:

You are forking an already multi-threaded main-process here. That is known to be problematic in general.

It is in-fact problem prone (and not just in Python). The rule is "thread after you fork, not before". Otherwise, the locks used by the thread executor will get duplicated across processes. If one of those processes dies while it has the lock, all of the other processes using that lock will deadlock -Raymond Hettinger.

Trigger for the error you get is apparantly that the duplication of the file-descriptor for the pipe fails in the child process.

To resolve this issue, either create your child-processes as long as your main-process is still single-threaded or use another start_method for creating new processes like 'spawn' (default on Windows) or 'forkserver', if available.

forkserver

When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

Available on Unix platforms which support passing file descriptors over Unix pipes. docs

You can specify another start_method with:

multiprocessing.set_start_method(method) Set the method which should be used to start child processes. method can be 'fork', 'spawn' or 'forkserver'.

Note that this should be called at most once, and it should be protected inside the if name == 'main' clause of the main module. docs

For a benchmark of the specific start_methods (on Ubuntu 18.04) look here.