在我的应用程序正在使用来自多模块管道蟒蛇进程间通信。 最近,我发现这取决于数据的,我通过他们发送大小怪异的行为。 根据Python文档这些管道是基于连接并以异步方式应该表现,但有时它们会停留在发送。 如果启用全双工在每个连接一切正常,即使我不使用用于发送和监听的连接。 任何人都可以解释这种现象?
- 100辆彩车,全双工禁用
代码工作,利用asynchronousness。 - 100辆彩车,全双工启用
这个例子如预期正常工作。 - 10000辆花车,全双工禁用
执行永远阻塞,即使是罚款较小的数据。 - 10000辆花车,全双工启用
精细一次。
代码(这不是我的生产代码,它只是说明我的意思):
from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid
PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000
def arg_passer(pipe_in, pipe_out, list_):
my_pid = getpid()
print "{}: Before send".format(my_pid)
pipe_out.send(list_)
print "{}: After send, before recv".format(my_pid)
buf = pipe_in.recv()
print "{}: After recv".format(my_pid)
if __name__ == "__main__":
pipes = [Pipe(False) for _ in range(PROC_NR)]
# pipes = [Pipe(True) for _ in range(PROC_NR)]
pipes_in = deque(p[0] for p in pipes)
pipes_out = deque(p[1] for p in pipes)
pipes_in.rotate(1)
pipes_out.rotate(-1)
data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
for foo in xrange(PROC_NR)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
首先,这是值得注意的执行multiprocessing.Pipe
类...
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
if duplex:
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
s1.close()
s2.close()
else:
fd1, fd2 = os.pipe()
c1 = _multiprocessing.Connection(fd1, writable=False)
c2 = _multiprocessing.Connection(fd2, readable=False)
return c1, c2
不同的是半双工“管道”使用匿名管道 ,但全双工“管道”实际上使用Unix域套接字 ,因为匿名管道是自然单向的。
我不知道你在这方面的“异步”一词的意思。 如果你的意思是“非阻塞I / O”,那么这是值得注意的是,这两种实现默认情况下使用阻塞I / O。
其次,值得注意你想发送的数据的大小腌制...
>>> from numpy.random import randn
>>> from cPickle import dumps
>>> len(dumps(randn(100)))
2479
>>> len(dumps(randn(10000)))
237154
第三,从pipe(7)
手册页...
管容量
的管具有有限的容量。 如果管道是满的,然后写(2)将被阻塞或失效,这取决于O_NONBLOCK标志是否被设置(见下文)。 不同的实施方式具有用于管容量不同的限制。 应用程序不应该依赖于特定的能力:一个应用程序应该被设计成一个阅读过程中,一旦消耗数据,因为它是可用的,从而使书写过程中不保持阻塞。
在Linux版本2.6.11之前,管道的容量是一样的系统页面大小(例如,4096个字节在i386)。 由于Linux 2.6.11,管道容量为65536个字节。
所以,实际上,你已经创建,所有的子进程都挡在了死锁pipe_out.send()
调用,其中没有一个能够从其他进程接收到任何数据,因为你发送的所有数据237154字节一重击,填补65536字节的缓冲区。
你也许会只使用Unix域套接字版本,但它的工作原理,目前的唯一原因是,它具有比管更大的缓冲区大小,你会发现如果增加的数量的解决方案也将失败DATA_POINTS
100000。
“快N”肮脏的黑客”的解决办法是把数据分成较小的块发送,但它不依赖于为特定大小缓冲区很好的做法。
一个更好的解决办法是在使用非阻塞I / O pipe_out.send()
调用,虽然我不是足够熟悉的multiprocessing
模块,以确定最佳的方式使用该模块来实现它。
伪代码将沿行...
while 1:
if we have sent all data and received all data:
break
send as much data as we can without blocking
receive as much data as we can without blocking
if we didn't send or receive anything in this iteration:
sleep for a bit so we don't waste CPU time
continue
...或者你可以使用Python select
模块,以避免睡眠时间超过是必要的,但同样,用它整合multiprocessing.Pipe
可能会非常棘手。
这有可能是multiprocessing.Queue
类做这一切给你,但我从来没有使用过它,所以你必须做一些实验。