I need to know when a Queue is closed and wont have more items so I can end the iteration.
I did it by putting a sentinel in the queue:
from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return self
def close(self):
self.put(self._sentinel)
def next(self):
item = self.get()
if item is self._sentinel:
raise StopIteration
else:
return item
Given that this is a very common use for a queue, isn't there any builtin implementation?
A sentinel is a reasonable way for a producer to send a message that no more queue tasks are forthcoming.
FWIW, your code can be simplified quite a bit with the two argument form of iter():
from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return iter(self.get, self._sentinel)
def close(self):
self.put(self._sentinel)
The multiprocessing module has its own version of Queue that does include a close
method. I am not sure how it works in threading, but its worth a try. I don't see why it shouldn't work the same:
from multiprocessing import Queue
q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()
You could just catch the IOError as the close signal.
TEST
from multiprocessing import Queue
from threading import Thread
def worker(q):
while True:
try:
item = q.get(timeout=.5)
except IOError:
print "Queue closed. Exiting thread."
return
except:
continue
print "Got item:", item
q = Queue()
for i in xrange(3):
q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.
Though to be honest, its not too much different than setting a flag on the Queue.Queue. The multiprocessing.Queue is just using a closed file descriptor as a flag:
from Queue import Queue
def worker2(q):
while True:
if q.closed:
print "Queue closed. Exiting thread."
return
try:
item = q.get(timeout=.5)
except:
continue
print "Got item:", item
q = Queue()
q.closed = False
for i in xrange(3):
q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.