多 - 生产者/消费者设计(Multiprocessing - producer/consumer

2019-08-04 10:17发布

我使用的是多处理模块分裂出一个非常大的任务。 它的工作原理在大多数情况下,但我必须失去我设计的东西很明显,因为这样一来它是非常难受,当所有的数据已被处理,有效地诉说。

我有一个运行两个独立的任务; 一个喂其他。 我想这是一个生产者/消费者问题。 我使用一个共享的队列中的所有进程,这里的生产填补了队列之间,而消费者从队列中读取并做处理。 问题是,有数据量有限,因此在某些时候每个人都需要知道所有的数据已被处理,以便系统能够正常关闭。

这似乎是有道理的使用map_async()函数,但由于生产商填补了队列,我不知道所有项目的前面,所以我必须进入一个while循环,并使用apply_async()并尝试当一切都与某种超时......丑做检测。

我觉得我失去了一些东西明显。 这又如何更好地设计?

PRODCUER

class ProducerProcess(multiprocessing.Process):
    def __init__(self, item, consumer_queue):
        self.item = item
        self.consumer_queue = consumer_queue
        multiprocessing.Process.__init__(self)

    def run(self):
        for record in get_records_for_item(self.item): # this takes time
            self.consumer_queue.put(record)

def start_producer_processes(producer_queue, consumer_queue, max_running):
    running = []

    while not producer_queue.empty():
        running = [r for r in running if r.is_alive()]
        if len(running) < max_running:
            producer_item = producer_queue.get()
            p = ProducerProcess(producer_item, consumer_queue)
            p.start()
            running.append(p)
        time.sleep(1)

消费者

def process_consumer_chunk(queue, chunksize=10000):
    for i in xrange(0, chunksize):
        try:
            # don't wait too long for an item
            # if new records don't arrive in 10 seconds, process what you have
            # and let the next process pick up more items.

            record = queue.get(True, 10)
        except Queue.Empty:                
            break

        do_stuff_with_record(record)

主要

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    consumer_queue = manager.Queue(1024*1024)
    producer_queue = manager.Queue()

    producer_items = xrange(0,10)

    for item in producer_items:
        producer_queue.put(item)

    p = multiprocessing.Process(target=start_producer_processes, args=(producer_queue, consumer_queue, 8))
    p.start()

    consumer_pool = multiprocessing.Pool(processes=16, maxtasksperchild=1)

这里是它变得俗气。 我不能使用地图,因为在同一时间被填充到消费清单。 所以,我必须进入一个while循环,并尝试检测超时。 而生产商仍在努力填补它的consumer_queue可以成为空的,所以我不能只是检测一个空的队列中的退出上。

    timed_out = False
    timeout= 1800
    while 1:
        try:
            result = consumer_pool.apply_async(process_consumer_chunk, (consumer_queue, ), dict(chunksize=chunksize,))
            if timed_out:
                timed_out = False

        except Queue.Empty:
            if timed_out:
                break

            timed_out = True
            time.sleep(timeout)
        time.sleep(1)

    consumer_queue.join()
    consumer_pool.close()
    consumer_pool.join()

我想,也许我可以得到()在主线程中的记录,并通过这些进入消费而不是传递的队列中,但是我觉得我结束了同一个问题的方式。 我还是要运行一个while循环,并使用apply_async()预先感谢您的任何建议!

Answer 1:

你可以使用一个manager.Event信号的工作结束。 此事件可以将所有进程之间共享,然后当你从你的主要过程,其他工人则可以正常关机信号吧。

while not event.is_set():
 ...rest of code...

所以,你的消费者会等待要设置的事件,一旦设置处理清理。

为了确定何时设置这个标志,你可以做一个join的生产线,而这些都是所有完毕后,您就可以在消费者线程加入。



Answer 2:

我想强烈推荐的SimPy ,而不是多进程/线程做离散事件仿真。



文章来源: Multiprocessing - producer/consumer design