Python - What is queue.task_done() used for?

2019-02-25 18:00发布

问题:

I wrote a script that has multiple threads (created with threading.Thread) fetching URLs from a Queue using queue.get_nowait(), and then processing the HTML. I am new to multi-threaded programming, and am having trouble understanding the purpose of the queue.task_done() function.

When the Queue is empty, it automatically returns the queue.Empty exception. So I don't understand the need for each thread to call the task_done() function. We know that we're done with the queue when its empty, so why do we need to notify it that the worker threads have finished their work (which has nothing to do with the queue, after they've gotten the URL from it)?

Could someone provide me with a code example (ideally using urllib, file I/O, or something other than fibonacci numbers and printing "Hello") that shows me how this function would be used in practical applications?

回答1:

Queue.task_done is not there for the workers' benefit. It is there to support Queue.join.


If I give you a box of work assignments, do I care about when you've taken everything out of the box?

No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys might still be working on stuff you took out of the box.

Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with Queue.join will wait until enough task_done calls have been made, not when the queue is empty.



回答2:

Could someone provide me with a code example (ideally using urllib, file I/O, or something other than fibonacci numbers and printing "Hello") that shows me how this function would be used in practical applications?

@user2357112's answer nicely explains the purpose of task_done, but lacks the requested example. Here is a function that calculates checksums of an arbitrary number of files and returns a dict mapping each file name to the corresponding checksum. Internal to the function, the work is divided among a several threads.

The function uses of Queue.join to wait until the workers have finished their assigned tasks, so it is safe to return the dictionary to the caller. It is a convenient way to wait for all files being processed, as opposed to them being merely dequeued.

import threading, queue, hashlib

def _work(q, checksums):
    while True:
        filename = q.get()
        if filename is None:
            q.put(None)
            break
        try:
            sha = hashlib.sha256()
            with open(filename, 'rb') as f:
                for chunk in iter(lambda: f.read(65536), b''):
                    sha.update(chunk)
            checksums[filename] = sha.digest()
        finally:
            q.task_done()

def calc_checksums(files):
    q = queue.Queue()
    checksums = {}
    for i in range(1):
        threading.Thread(target=_work, args=(q, checksums)).start()
    for f in files:
        q.put(f)
    q.join()
    q.put(None)  # tell workers to exit
    return checksums

A note on the GIL: since the code in hashlib internally releases the GIL while calculating the checksum, using multiple threads yields a measurable (1.75x-2x depending on Python version) speedup compared to the single-threaded variant.