我需要当队列被关闭,不会有更多的项目,所以我可以结束迭代就知道了。
我做到了,通过把一个哨兵在队列中:
from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return self
def close(self):
self.put(self._sentinel)
def next(self):
item = self.get()
if item is self._sentinel:
raise StopIteration
else:
return item
考虑到这是一个队列中的使用非常普遍,是不是有什么内置执行?
前哨是一个生产者发送消息没有更多的队列中的任务即将到来的合理方法。
FWIW,你的代码可以简化不少与两个参数形式的国际热核实验堆() :
from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return iter(self.get, self._sentinel)
def close(self):
self.put(self._sentinel)
多处理模块都有自己的版本的队列 ,它包括一个close
的方法。 我不知道它是如何工作的线程,但它值得一试。 我不明白为什么它不应该工作一样:
from multiprocessing import Queue
q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()
你可以只赶上IO错误的关闭信号。
测试
from multiprocessing import Queue
from threading import Thread
def worker(q):
while True:
try:
item = q.get(timeout=.5)
except IOError:
print "Queue closed. Exiting thread."
return
except:
continue
print "Got item:", item
q = Queue()
for i in xrange(3):
q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.
虽然说实话,它没有太多的比对Queue.Queue设置一个标志不同。 该multiprocessing.Queue只是使用一个封闭的文件描述符作为一个标志:
from Queue import Queue
def worker2(q):
while True:
if q.closed:
print "Queue closed. Exiting thread."
return
try:
item = q.get(timeout=.5)
except:
continue
print "Got item:", item
q = Queue()
q.closed = False
for i in xrange(3):
q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.