蟒蛇管道的同步/异步行为(Synchronous/Asynchronous behaviour of

2019-08-31 17:03发布

在我的应用程序正在使用来自多模块管道蟒蛇进程间通信。 最近,我发现这取决于数据的,我通过他们发送大小怪异的行为。 根据Python文档这些管道是基于连接并以异步方式应该表现,但有时它们会停留在发送。 如果启用全双工在每个连接一切正常,即使我不使用用于发送和监听的连接。 任何人都可以解释这种现象?

  1. 100辆彩车,全双工禁用
    代码工作,利用asynchronousness。
  2. 100辆彩车,全双工启用
    这个例子如预期正常工作。
  3. 10000辆花车,全双工禁用
    执行永远阻塞,即使是罚款较小的数据。
  4. 10000辆花车,全双工启用
    精细一次。

代码(这不是我的生产代码,它只是说明我的意思):

from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid

PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000


def arg_passer(pipe_in, pipe_out, list_):
    my_pid = getpid()
    print "{}: Before send".format(my_pid)
    pipe_out.send(list_)
    print "{}: After send, before recv".format(my_pid)
    buf = pipe_in.recv()
    print "{}: After recv".format(my_pid)


if __name__ == "__main__":
    pipes = [Pipe(False) for _ in range(PROC_NR)]
    # pipes = [Pipe(True) for _ in range(PROC_NR)]
    pipes_in = deque(p[0] for p in pipes)
    pipes_out = deque(p[1] for p in pipes)
    pipes_in.rotate(1)
    pipes_out.rotate(-1)

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
                 for foo in xrange(PROC_NR)]

    for proc in processes:
        proc.start()

    for proc in processes:
        proc.join()

Answer 1:

首先,这是值得注意的执行multiprocessing.Pipe类...

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    if duplex:
        s1, s2 = socket.socketpair()
        s1.setblocking(True)
        s2.setblocking(True)
        c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
        c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
        s1.close()
        s2.close()
    else:
        fd1, fd2 = os.pipe()
        c1 = _multiprocessing.Connection(fd1, writable=False)
        c2 = _multiprocessing.Connection(fd2, readable=False)

    return c1, c2

不同的是半双工“管道”使用匿名管道 ,但全双工“管道”实际上使用Unix域套接字 ,因为匿名管道是自然单向的。

我不知道你在这方面的“异步”一词的意思。 如果你的意思是“非阻塞I / O”,那么这是值得注意的是,这两种实现默认情况下使用阻塞I / O。


其次,值得注意你想发送的数据的大小腌制...

>>> from numpy.random import randn
>>> from cPickle import dumps
>>> len(dumps(randn(100)))
2479
>>> len(dumps(randn(10000)))
237154

第三,从pipe(7)手册页...

管容量

的管具有有限的容量。 如果管道是满的,然后写(2)将被阻塞或失效,这取决于O_NONBLOCK标志是否被设置(见下文)。 不同的实施方式具有用于管容量不同的限制。 应用程序不应该依赖于特定的能力:一个应用程序应该被设计成一个阅读过程中,一旦消耗数据,因为它是可用的,从而使书写过程中不保持阻塞。

在Linux版本2.6.11之前,管道的容量是一样的系统页面大小(例如,4096个字节在i386)。 由于Linux 2.6.11,管道容量为65536个字节。


所以,实际上,你已经创建,所有的子进程都挡在了死锁pipe_out.send()调用,其中没有一个能够从其他进程接收到任何数据,因为你发送的所有数据237154字节一重击,填补65536字节的缓冲区。

你也许会只使用Unix域套接字版本,但它的工作原理,目前的唯一原因是,它具有比管更大的缓冲区大小,你会发现如果增加的数量的解决方案也将失败DATA_POINTS 100000。

“快N”肮脏的黑客”的解决办法是把数据分成较小的块发送,但它不依赖于为特定大小缓冲区很好的做法。

一个更好的解决办法是在使用非阻塞I / O pipe_out.send()调用,虽然我不是足够熟悉的multiprocessing模块,以确定最佳的方式使用该模块来实现它。

伪代码将沿行...

while 1:
    if we have sent all data and received all data:
        break
    send as much data as we can without blocking
    receive as much data as we can without blocking
    if we didn't send or receive anything in this iteration:
        sleep for a bit so we don't waste CPU time
        continue

...或者你可以使用Python select模块,以避免睡眠时间超过是必要的,但同样,用它整合multiprocessing.Pipe可能会非常棘手。

这有可能是multiprocessing.Queue类做这一切给你,但我从来没有使用过它,所以你必须做一些实验。



文章来源: Synchronous/Asynchronous behaviour of python Pipes