在多个Python的多队列“选择”?(“select” on multiple Python mul

2019-06-25 20:40发布

什么是等待(无旋转),直到事情最好的办法是在提供任一两(多) 队列 ,其中都驻留在同一系统上?

Answer 1:

它看起来并不像有又处理这个正式的方式。 或者至少,不是基于这样的:

  • http://bugs.python.org/issue3831

你可以尝试像什么这个职位是做 - 访问底层管道文件句柄:

  • http://haltcondition.net/?p=2319

然后用选择。



Answer 2:

其实你可以在select.select使用multiprocessing.Queue对象。 即

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

会选择阙,只有当它已准备好被读取。

它虽然没有文档。 我正在读multiprocessing.queue库(Linux的,在它的通常是某事像/usr/lib/python2.6/multiprocessing/queue.py)的源代码,以找到它。

随着Queue.Queue我没有发现任何智能的方式来做到这一点(我真的很喜欢)。



Answer 3:

好像使用线程这将外来项目单个队列,你再等上独立于平台的方式,使用多的时候是一个现实的选择。

避免线程要求要么处理低级别的管道/函数依赖是具体的,不容易与更高级别的API一致地处理这两个平台。

或者你需要的队列与设置,我认为是正确的高级接口去回调的能力。 即你会写是这样的:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

也许多包可能增长这个API,但它现在还没有。 概念行之有效与使用术语“通道”,而不是“队列” py.execnet,看到这里http://tinyurl.com/nmtr4w



Answer 4:

你可以使用类似的观察模式,其中队列的用户被通知的状态变化。

在这种情况下,你可以让你的工作线程指定为每个队列监听器,只要收到就绪信号,它可以在新项目中工作,否则睡觉。



Answer 5:

不知道一个多队列如何选择上适用于Windows。 至于选择在Windows监听套接字,而不是文件句柄,我怀疑可能有问题。

我的回答是让一个线程来听以阻塞方式每个队列,并把所有到一个单一的队列听取了主线程,基本复个人排队到一个单一的一个结果。

我这样做的代码是:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

下面的测试程序演示了如何使用它:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

希望这可以帮助。

托尼·华莱士



Answer 6:

上面代码的新版本...

不知道一个多队列如何选择上适用于Windows。 至于选择在Windows监听套接字,而不是文件句柄,我怀疑可能有问题。

我的回答是让一个线程来听以阻塞方式每个队列,并把所有到一个单一的队列听取了主线程,基本复个人排队到一个单一的一个结果。

我这样做的代码是:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

后续的代码是我的测试程序,以显示它是如何工作:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)


Answer 7:

对于Python 3.3,你可以用multiprocessing.connection.wait多个等待Queue._reader一次的对象。



Answer 8:

不这样做。

戴上消息的标题,并将它们发送到公共队列。 这简化了代码,将是更清洁的整体。



文章来源: “select” on multiple Python multiprocessing Queues?