Pool workers do not complete all tasks

2019-05-17 00:13发布

I have a relatively simple python multiprocessing script that sets up a pool of workers that append output to a pandas dataframe by way of a custom manager. What I am finding is when I call close()/join() on the pool, not all the tasks submitted by apply_async are being completed.

Here's a simplified example that submits 1000 jobs but only half complete causing an assertion error. Have I overlooked something very simple or is this perhaps a bug?

from pandas import DataFrame
from multiprocessing.managers import BaseManager, Pool

class DataFrameResults:
    def __init__(self):
        self.results = DataFrame(columns=("A", "B")) 

    def get_count(self):
        return self.results["A"].count()

    def register_result(self, a, b):
        self.results = self.results.append([{"A": a, "B": b}], ignore_index=True)

class MyManager(BaseManager): pass

MyManager.register('DataFrameResults', DataFrameResults)

def f1(results, a, b):
    results.register_result(a, b)

def main():
    manager = MyManager()
    manager.start()
    results = manager.DataFrameResults()

    pool = Pool(processes=4)

    for (i) in range(0, 1000):
        pool.apply_async(f1, [results, i, i*i])
    pool.close()
    pool.join()

    print results.get_count()
    assert results.get_count() == 1000

if __name__ == "__main__":
    main()

1条回答
仙女界的扛把子
2楼-- · 2019-05-17 00:40

[EDIT] The issue which you're seeing is because of this code:

self.results = self.results.append(...)

this isn't atomic. So in some cases, the thread will be interrupted after reading self.results (or while appending) but before it can assign the new frame to self.results -> this instance will be lost.

The correct solution is to wait use the results objects to get the results and then append all of them in the main thread.

查看更多
登录 后发表回答