游泳池工人没有完成所有任务(Pool workers do not complete all tas

2019-09-21 06:25发布

我有一个相对简单的Python脚本多,设置了该附加输出到大熊猫工人池dataframe由一个自定义的管理方式。 什么我发现是,当我调用close()/ join()方法的游泳池,不是所有提交的apply_async的任务被完成。

下面是提交1000个就业机会,但只完成了一半造成断言错误一个简单的例子。 难道我忽略了一些很简单的或者是这也许一个错误?

from pandas import DataFrame
from multiprocessing.managers import BaseManager, Pool

class DataFrameResults:
    def __init__(self):
        self.results = DataFrame(columns=("A", "B")) 

    def get_count(self):
        return self.results["A"].count()

    def register_result(self, a, b):
        self.results = self.results.append([{"A": a, "B": b}], ignore_index=True)

class MyManager(BaseManager): pass

MyManager.register('DataFrameResults', DataFrameResults)

def f1(results, a, b):
    results.register_result(a, b)

def main():
    manager = MyManager()
    manager.start()
    results = manager.DataFrameResults()

    pool = Pool(processes=4)

    for (i) in range(0, 1000):
        pool.apply_async(f1, [results, i, i*i])
    pool.close()
    pool.join()

    print results.get_count()
    assert results.get_count() == 1000

if __name__ == "__main__":
    main()

Answer 1:

[编辑],使用遇到的问题是,因为这样的代码:

self.results = self.results.append(...)

这是不是原子。 因此,在某些情况下,该线程将阅读后中断self.results (或同时追加),但才可以分配新框架self.results - >这种情况下将丢失。

正确的解决办法是等待使用结果对象,以得到结果,然后添加他们都在主线程。



文章来源: Pool workers do not complete all tasks