我有一个大阵列我需要执行独立(并行)的任务,包括修改的对象参数的定制对象。 我同时使用管理器()。快译通,和“sharedmem'ory试过了,但也不是工作。 例如:
import numpy as np
import multiprocessing as mp
import sharedmem as shm
class Tester:
num = 0.0
name = 'none'
def __init__(self,tnum=num, tname=name):
self.num = tnum
self.name = tname
def __str__(self):
return '%f %s' % (self.num, self.name)
def mod(test, nn):
test.num = np.random.randn()
test.name = nn
if __name__ == '__main__':
num = 10
tests = np.empty(num, dtype=object)
for it in range(num):
tests[it] = Tester(tnum=it*1.0)
sh_tests = shm.empty(num, dtype=object)
for it in range(num):
sh_tests[it] = tests[it]
print sh_tests[it]
print '\n'
workers = [ mp.Process(target=mod, args=(test, 'some') ) for test in sh_tests ]
for work in workers: work.start()
for work in workers: work.join()
for test in sh_tests: print test
打印出:
0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none
0.000000 none
1.000000 none
2.000000 none
3.000000 none
4.000000 none
5.000000 none
6.000000 none
7.000000 none
8.000000 none
9.000000 none
也就是说,对象不被修改。
我怎样才能达到预期的行为?
问题是,当对象被传递给工作进程,他们都挤满了咸菜,运到其他进程,他们在那里进行解压缩和制作。 你的对象与其说传递到其他进程,如克隆。 你不回的对象,因此克隆的对象兴高采烈地修改,然后扔掉。
它看起来像这样不能做( 的Python:可能共享内存2个单独的进程之间的数据 )直接。
你可以做的是返回修改的对象。
import numpy as np
import multiprocessing as mp
class Tester:
num = 0.0
name = 'none'
def __init__(self,tnum=num, tname=name):
self.num = tnum
self.name = tname
def __str__(self):
return '%f %s' % (self.num, self.name)
def mod(test, nn, out_queue):
print test.num
test.num = np.random.randn()
print test.num
test.name = nn
out_queue.put(test)
if __name__ == '__main__':
num = 10
out_queue = mp.Queue()
tests = np.empty(num, dtype=object)
for it in range(num):
tests[it] = Tester(tnum=it*1.0)
print '\n'
workers = [ mp.Process(target=mod, args=(test, 'some', out_queue) ) for test in tests ]
for work in workers: work.start()
for work in workers: work.join()
res_lst = []
for j in range(len(workers)):
res_lst.append(out_queue.get())
for test in res_lst: print test
这也导致有趣的观察,由于产生了过程是相同的,他们都用随机数相同的种子开始,所以他们都产生相同的“随机”数。
我没有看到你传递SHM引用外面的子进程,所以我看不出他们所做的工作可以写回共享内存做。 也许我失去了一些东西。
或者,你有没有考虑numpy.memmap ? (顺便说一句:tcaswell,这里所说的模块似乎是: numpy的-sharedmem )。
你可能也想读Sturla莫尔登的使用并行数值计算的Python,多和NumPy的/ SciPy的 (PDF)在unutbu的回答建议[StackOverflow的:我如何通过Python的子进程之间的大numpy的阵列,而不保存到磁盘?]和( 我如何通过蟒子流程之间的大numpy的阵列,而不保存到磁盘? )。 和乔金顿的StackOverflow的:与NumPy与多和MMAP 。
这可能比直接相关的更多灵感。
您的代码不会尝试修改共享内存。 它只是克隆的各个对象。
dtype=object
意味着sharedmem
将不起作用由于所概述的原因在由@tcaswell提供的链接 :
对象图,其包括引用的共享/指向其他对象基本上是不可行
对于纯(值)类型,可以使用共享存储器,请参阅使用numpy的阵列中的共享存储器多处理为 。
该manager
的做法也应努力(它只是复制周围的对象):
import random
from multiprocessing import Pool, Manager
class Tester(object):
def __init__(self, num=0.0, name='none'):
self.num = num
self.name = name
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)
def init(L):
global tests
tests = L
def modify(i_t_nn):
i, t, nn = i_t_nn
t.num += random.normalvariate(mu=0, sigma=1) # modify private copy
t.name = nn
tests[i] = t # copy back
return i
def main():
num_processes = num = 10 #note: num_processes and num may differ
manager = Manager()
tests = manager.list([Tester(num=i) for i in range(num)])
print(tests[:2])
args = ((i, t, 'some') for i, t in enumerate(tests))
pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
for i in pool.imap_unordered(modify, args):
print("done %d" % i)
pool.close()
pool.join()
print(tests[:2])
if __name__ == '__main__':
main()
因为你不能在进程间共享Python对象,使用任何实现multiprocessing
将是低效的,如果你有显著的对象,因为你必须将物体以共享数据复制。
如果你愿意尝试不同的方法,你可以尝试雷( 文档 )! 这是一个框架,可以很容易地编写并行和分布式的Python。 简而言之,它可以让你启动并行Python函数,相似的功能multiprocessing
,但它也更灵活,因为雷进程可以共享内存。 这里是你的脚本编写的光芒,用“演员”(共享对象)的概念:
# You can install Ray with pip.
import ray
import numpy as np
# Add this line to signify that you want to share Tester objects
# (called "actors" in Ray) between processes.
@ray.remote
class Tester(object):
num = 0.0
name = 'none'
def __init__(self,tnum=num, tname=name):
self.num = tnum
self.name = tname
def __str__(self):
return '%f %s' % (self.num, self.name)
# Convert mod to be a method of the Tester object.
def mod(self, nn):
self.num = np.random.randn()
self.name = nn
if __name__ == '__main__':
# Start Ray. This allows you to create shared Testers (called "actors").
ray.init()
num = 10
tests = np.empty(num, dtype=object)
for it in range(num):
# Create a shared Tester object (an "actor").
tests[it] = Tester.remote(tnum=it*1.0)
# Do some parallel work.
for test in tests:
test.mod.remote('some')
# Compute the __str__ representations of each Tester in parallel.
test_str_futures = [test.__str__.remote() for test in tests]
# Get and print the __str__ return values. `ray.get` will block
# until the return values are ready.
test_strs = ray.get(test_str_futures)
for test_str in test_strs:
print(test_str)