有没有办法重新发送一块数据进行处理,如果原来的计算失败,用一个简单的池?
import random
from multiprocessing import Pool
def f(x):
if random.getrandbits(1):
raise ValueError("Retry this computation")
return x*x
p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
如果你可以立即(或者不介意)重试,使用装饰包裹的功能:
import random
from multiprocessing import Pool
from functools import wraps
def retry(f):
@wraps(f)
def wrapped(*args, **kwargs):
while True:
try:
return f(*args, **kwargs)
except ValueError:
pass
return wrapped
@retry
def f(x):
if random.getrandbits(1):
raise ValueError("Retry this computation")
return x*x
p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
你可以使用一个Queue
反馈失败入Pool
通过在起动一个循环Process
:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure / exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
total_items = len(pending)
successful = []
failure_tracker = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, pending)
retry_results = []
while len(successful) < total_items:
successful.extend([r for r in results if not r is None])
successful.extend([r for r in retry_results if not r is None])
failed_items = []
while not q.empty():
failed_items.append(q.get())
if failed_items:
failure_tracker.append(failed_items)
retry_results = p.imap(f, failed_items);
p.close()
p.join()
print "Results: %s" % successful
print "Failures: %s" % failure_tracker
if __name__ == '__main__':
main(range(1, 10))
输出是这样的:
Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]
甲Pool
不能多个进程之间共享。 因此,该Queue
为基础的方法。 如果你试图通过一个池作为参数传递给池的过程中,您将收到此错误:
NotImplementedError: pool objects cannot be passed between processes or pickled
你可以尝试或者几立即重试你的函数内f
,避免同步开销。 这真的是你的函数应该多久等待重试的问题,并在成功的可能性有多大,如果立即重试。
老答: 为了完整起见,这里是我的老的答案,这是不直接重新提交入池是最佳的,但可能仍然是相关的根据使用情况,因为它提供了一个自然的方式来处理/极限n
-level重试:
您可以使用Queue
聚集失败和在每次运行结束时重新提交,多次运行:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure / exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
run_number = 1
while pending:
jobs = pending
pending = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
p.join()
failed_items = []
while not q.empty():
failed_items.append(q.get())
successful = [r for r in results if not r is None]
print "(%d) Succeeded: %s" % (run_number, successful)
print "(%d) Failed: %s" % (run_number, failed_items)
print
pending = failed_items
run_number += 1
if __name__ == '__main__':
main(range(1, 10))
像这样的输出:
(1) Succeeded: [9, 16, 36, 81]
(1) Failed: [2, 1, 5, 7, 8]
(2) Succeeded: [64]
(2) Failed: [2, 1, 5, 7]
(3) Succeeded: [1, 25]
(3) Failed: [2, 7]
(4) Succeeded: [49]
(4) Failed: [2]
(5) Succeeded: [4]
(5) Failed: []