zeromq zmq.Poller & stdin

2019-06-22 07:59发布

问题:

Is it possible to use zmq.Poller to also poll for data availability on stdin? If not, what would be the most efficient wait to poll, at the some time (ideally), for data availability on zeromq sockets & stdin?

回答1:

yes, zmq pollers do support native FDs, including stdin, etc., so you just need to check sys.stdin.fileno():

poller = zmq.Poller()
poller.register(sys.stdin, zmq.POLLIN)
poller.register(mysocket, zmq.POLLIN)
evts = dict(poller.poll(1000))
stdin_ready = evts.get(sys.stdin.fileno(), False)
socket_ready = evts.get(mysocket, False)


回答2:

If you are sure you will never run on Windows, you can simply register sys.stdin with a zmq.Poller (as described by minrk, above).

However, the select() implementation in Winsock only supports sockets and cannot be used to poll "regular" file descriptors like the standard input. Therefore, when running on Windows, you need to bridge the standard input with a 0MQ socket on the inproc transport.

Suggested implementation using an exclusive pair socket:

def forward_lines(stream, socket):
    """Read lines from `stream` and send them over `socket`."""
    try:
        line = stream.readline()
        while line:
            socket.send(line[:-1])
            line = stream.readline()
        socket.send('')  # send "eof message".
    finally:
        # NOTE: `zmq.Context.term()` in the main thread will block until this
        #       socket is closed, so we can't run this function in daemon
        #       thread hoping that it will just close itself.
        socket.close()


def forward_standard_input(context):
    """Start a thread that will bridge the standard input to a 0MQ socket and
    return an exclusive pair socket from which you can read lines retrieved
    from the standard input.  You will receive a final empty line when the EOF
    character is input to the keyboard."""
    reader = context.socket(zmq.PAIR)
    reader.connect('inproc://standard-input')
    writer = context.socket(zmq.PAIR)
    writer.bind('inproc://standard-input')
    thread = threading.Thread(target=forward_lines,
                              args=(sys.stdin, writer))
    thread.start()
    return reader


if __name__ == '__main__':
    context = zmq.Context()
    reader = forward_standard_input(context)
    poller = zmq.Poller()
    poller.register(reader, zmq.POLLIN)
    poller.register(...)

    events = dict(poller.poll())
    if events.get(reader, 0) & zmq.POLLIN:
        line = reader.recv()
        # process line.
    if events.get(..., 0) & zmq.POLLIN:
        # ...