Python: Check if named pipe has data

2019-06-24 14:13发布

问题:

I have a Python3 process on my Unix system always running, and I want to be able to randomly send data to it through a named pipe from other processes that only run occasionally. If the named pipe doesn't have data, I want my process to continue doing other things, so I need to check whether it has data without blocking.

I can't figure out how to check without opening it, but opening blocks unless I set the non-blocking flag. And if I set the flag, it crashes if I happen to write to the pipe before or during the read.

This is the best I've managed to do:

import os

fifo = "pipe_test.fifo"
done = False
fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
while not done:
    try:
        s = os.read(fd, 1024) # buffer size may need tweaking
        print(s)
        done = True
    except BlockingIOError as e:
        pass
os.close(fd)

If there's no data in the pipe, I get b"", and it exits. If there's data in the pipe, it gets an exception once, retries, then gets the data. Seems like I'm doing something wrong and might run into weird race conditions. Is there a nicer way to do this?

回答1:

I would not use a named-pipe if you can change the clients' code, but UNIX domain sockets instead, because they support datagrams:

import errno, fcntl, os, socket

Server:

# bind socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.bind('pipe_test.fifo')
# set socket non-blocking
fcntl.fcntl(sock.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)

# get a datagram
try:
    datagram = sock.recv(1024)
except (OSError, socket.error) as ex:
    if ex.errno not in (errno.EINTR, errno.EAGAIN):
        raise
else:
    print('Datagram: %r' % datagram)

Client:

sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.sendto('Hello!', 'pipe_test.fifo')

But you might want to look into multithreading instead of using non-blocking sockets.



回答2:

This isn't really the answer, but in case it's useful to anyone, here's how I'm doing it with another thread.

class QueryThread(threading.Thread):

    def __init__(self, args=(), kwargs=None):
        threading.Thread.__init__(self, args=(), kwargs=None)
        self.daemon = True
        self.buf = []
        if not general.f_exists("pipe"):
            os.mkfifo("pipe")

    def run(self):
        f = open("pipe")
        while True:
            try:
                query = next(f).replace("\n", "")
                if query != "":
                    self.buf.append(query)
                    print("Read in new query from pipe: {}, buf = {}".format(query, self.buf))
            except StopIteration: # not a pipe error, just means no data is left, so time to re-open
                f.close()
                f = open("pipe")
        f.close()

    def get_query(self):
        if len(self.buf) == 0: return ""
        query = self.buf[0]
        self.buf.__delitem__(0)
        return query

It keeps newline-deliminated messages in a buffer. You can call the get_query method from another thread and get the last message that was received.