避免竞争条件在Python 3的多队列(Avoiding race conditions in Py

2019-07-29 10:23发布

我试图找到约6.1十亿(自定义)项的最大重量,我想与并行处理来做到这一点。 对于我的具体应用也有不需要我的迭代超过61名十亿的项目更好的算法,但解释他们的课本是在我的头上,我的老板希望这个在4天内完成。 我想我和我的公司看中服务器和并行处理一个更好的机会。 但是,我知道并行处理一切都从阅读的Python 文档 。 这是说我敢输了...

我的当前理论是设置一个进料器过程中,输入队列,工作进程的一大堆(比如,30),和一个输出队列(发现在输出队列中的最大元素将是微不足道的)。 我不明白的是饲养过程中是如何告诉工作进程何时停止等待的项目才能通过输入队列。

我曾想过使用multiprocessing.Pool.map_async我的6.1E9项目迭代,但需要近10分钟刚刚通过的项目没有做任何事情对他们进行迭代。 除非我误解的东西...,map_async遍历它们将它们分配给进程可以做,而流程开始工作。 ( Pool还提供imap但文件说,这是类似于map ,这似乎并没有异步工作, 我想异步的,对吧?)

相关的问题 :我想用concurrent.futures而不是multiprocessing ? 我无法在第一人实行双队列系统(这是行究竟是如何在美国工作的每一个熟食店...),所以有一个更Python /内置的方式做到这一点?

下面是我想要做的骨架。 见中间的注释块。

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)


这里的答案

我发现了一个非常彻底的回答我的问题,和一个温柔的介绍在Python基金会联络部主任道格·赫尔曼多任务处理。 我想要的是“毒丸”的格局。 看看这里: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

道具@MRAB张贴这一概念的核心。

Answer 1:

你可以把一个特殊的终止项目,如无,到队列中。 当工人看到它,就可以把它放回去了其他工人看到,然后终止。 或者,你可以把每个工人的一个特殊终端项目进入队列。



文章来源: Avoiding race conditions in Python 3's multiprocessing Queues