How to use multiprocessing.Queue.get method?

2020-02-01 17:43发布

The code below places three numbers in a queue. Then it attempts to get the numbers back from the queue. But it never does. How to get the data from the queue?

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    print queue.get()

3条回答
在下西门庆
2楼-- · 2020-02-01 17:50

I originally deleted this answer after I read @Martijn Pieters', since he decribed the "why this doesn't work" in more detail and earlier. Then I realized, that the use case in OP's example doesn't quite fit to the canonical sounding title of

"How to use multiprocessing.Queue.get method".

That's not because there's no child process involved for demonstration, but because in real applications hardly ever a queue is pre-filled and only read out after, but reading and writing happens interleaved with waiting times in between. The extended demonstration code Martijn showed, wouldn't work in the usual scenarios, because the while loop would break too soon when enqueuing doesn't keep up with reading. So here is the answer reloaded, which is able to deal with the usual interleaved feeds & reads scenarios:


Don't rely on queue.empty checks for synchronization.

After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty. ...

empty()

Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. docs

Either use for msg in iter(queue.get, sentinel): to .get() from the queue, where you break out of the loop by passing a sentinel value...iter(callable, sentinel)?

from multiprocessing import Queue

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    for msg in iter(queue.get, SENTINEL):
        print(msg)

...or use get_nowait() and handle a possible queue.Empty exception if you need a non-blocking solution.

from multiprocessing import Queue
from queue import Empty
import time

SENTINEL = None

if __name__ == '__main__':

    queue = Queue()

    for i in [*range(3), SENTINEL]:
        queue.put(i)

    while True:
        try:
            msg = queue.get_nowait()
            if msg == SENTINEL:
                break
            print(msg)
        except Empty:
            # do other stuff
            time.sleep(0.1)

In case only one process and only one thread within this process is reading the queue, it would be also possible to exchange the last code snippet with:

while True:
    if not queue.empty():  # this is not an atomic operation ...
        msg = queue.get()  # ... thread could be interrupted in between
        if msg == SENTINEL:
            break
        print(msg)
    else:
        # do other stuff
        time.sleep(0.1)

Since a thread could drop the GIL in between checking if not queue.empty() and queue.get(), this wouldn't be suitable for multi-threaded queue-reads in a process. The same applies if multiple processes are reading from the queue.

For single-producer / single-consumer scenarios, using a multiprocessing.Pipe instead of multiprocessing.Queue would be sufficient and more performant, though.

查看更多
Lonely孤独者°
3楼-- · 2020-02-01 18:00

Your code actually works, some of the time.

That's because the queue is not instantly not empty. The implementation is a bit more involved to support communication between multiple processes, so threads and pipes are involved that cause the empty state to last a little longer than your code allows for.

See the note in the Pipes and Queues section:

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.

  1. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False [...]

(bold emphasis mine)

If you add a loop to check for emptyness first then your code works:

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while queue.empty():
    print 'queue is still empty'

while not queue.empty():
    print queue.get()

When you run the above, most of the time the 'queue is still empty' appears once. Sometimes it doesn't appear at all, and sometimes it'll be printed twice.

查看更多
神经病院院长
4楼-- · 2020-02-01 18:11

Check queue before using get:

import multiprocessing

queue = multiprocessing.Queue()

for i in range(3):
    queue.put(i)

while not queue.empty():
    if not queue.empty():
        print queue.get()
查看更多
登录 后发表回答