Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello Wo

2019-04-13 23:16发布

问题:

I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:

Edit 1: Minimized example

Edit 2: Included solution, see answer down for how.

import asyncio
import zmq.asyncio
from zmq.asyncio import Context

# manages message flow between publishers and subscribers
class HelloWorldMessage:
    def __init__(self, url='127.0.0.1', port='5555'):
        self.url = "tcp://{}:{}".format(url, port)
        self.ctx = Context.instance()

        # activate publishers / subscribers
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.pub_hello_world(),
            self.sub_hello_world(),
        ]))

    # generates message "Hello World" and publish to topic 'world'
    async def pub_hello_world(self):
        pub = self.ctx.socket(zmq.PUB)
        pub.connect(self.url)

        # message contents
        msg = "Hello World"
        print(msg)

        # keep sending messages
        while True:
            # --MOVED-- slow down message publication
            await asyncio.sleep(1) 

            # publish message to topic 'world'
            # async always needs `send_multipart()`
            await pub.send_multipart([b'world', msg.encode('ascii')])  # WRONG: bytes(msg)

    # processes message "Hello World" from topic 'world'
    async def sub_hello_world(self):
        sub = self.ctx.socket(zmq.SUB)
        sub.bind(self.url)
        sub.setsockopt(zmq.SUBSCRIBE, b'world')

        # keep listening to all published message on topic 'world'
        while True:
            msg = await sub.recv_multipart()
            # ERROR: WAITS FOREVER
            print('received: ', msg)

if __name__ == '__main__':
    HelloWorldMessage()

Problem

With the above code only 1 Hello World is printed and then waits forever. If I press ctrl+c, I get the following error:

python helloworld_pubsub.py

Hello World
^CTraceback (most recent call last):
  File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
    HelloWorldMessage()
  File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
    self.sub_hello_world(),
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
    event_list = self._selector.select(timeout)
  File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

Versions: libzmq: 4.2.3, pyzmq: 17.0.0, Ubuntu 16.04

Any insights are appreciated.

回答1:

De-coupling for an OOP separation of concerns is fine, yet
let's also spend some care on debugging the code:

1) ZeroMQ PUB/SUB Scalable Formal Communication Archetype is known for years to require some time before PUB/SUB-s get indeed ready so as to broadcast / accept messages. Thus one ought prefer to setup the infrastructure best inside the .__init__() and not right before SUB-s are supposed to already receive some payload(s)

In my view, this would be a safer design approach:

class HelloWorldMessage:
    """                                                       __doc__
    [DEF-ME]
    [DOC-ME]

    USAGE:     with HelloWorldMessage() as aContextManagerFUSEd_class_INSTANCE:
                    # may              use aContextManagerFUSEd_class_INSTANCE
                    # and shall safely
                    #     gracefully terminate locally spawned ZeroMQ resources
    PARAMETERS:
    RETURNS:   
    THROWS:    
    EXAMPLE:   
    REF.s:

    [TEST-ME]
    [PERF-ME]
    [PUB-ME]
    """
    def __init__( self, url  = '127.0.0.1',
                        port = '5555'
                        ):

        self._url = "tcp://{}:{}".format( url, port )
        #---------------------------------------------------- CONTEXT:
        self._ctx = Context.instance();                       print( "INF: zmq.asyncio.Context() set" if ( zmq.ZMQError() == 0 ) else "ERR[1]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- SUB:
        self._sub = self._ctx.socket(zmq.SUB );               print( "INF: zmq.SUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[2]: {0:}".format( zmq.ZMQError() ) )
        self._sub.bind(                  self._url );         print( "INF: zmq.SUB.bind() done"       if ( zmq.ZMQError() == 0 ) else "ERR[3]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.SUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[4]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.SUBSCRIBE, b'world');print( "INF: zmq.SUB subscribed"        if ( zmq.ZMQError() == 0 ) else "ERR[5]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- PUB:
        self._pub = self._ctx.socket(zmq.PUB );               print( "INF: zmq.PUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[6]: {0:}".format( zmq.ZMQError() ) )
        self._pub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.PUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[7]: {0:}".format( zmq.ZMQError() ) )
        self._pub.connect(               self._url );         print( "INF: zmq.PUB.connect() done"    if ( zmq.ZMQError() == 0 ) else "ERR[8]: {0:}".format( zmq.ZMQError() ) )
        #----------------------------------------------------
        ...
    def __enter__( self ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __enter__()-auto-METHOD
        return self

    def __exit__( self, exc_type, exc_value, traceback ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __exit__()-auto-METHOD
        self.try_to_close( self._pub );
        self.try_to_close( self._sub );
        pass;         self._ctx.term()
        return

    ################################################################
    #
    # A       PUB-SENDER ------------------------------------
    async def pub_hello_world( self ):

          self._pObj = PubHelloWorld();                       print( "INF: pObj set on PUB-side"      if ( self._pObj.msg_pub()  # instance-fuse(d)
                                                                                                         ==   "Hello World"    ) else "ERR[9]: {0:}".format( "Hello World" ) )
          try:
               while True:                                    # keep sending messages
                   self._sMsg = self._pObj.msg_pub();         print( "INF: pObj.msg_pub() called"     if ( self._sMsg  != None ) else "ERR[A]: {0:}".format( "msg == ?"    ) )
                   pass;                                      print( self._sMsg )
                   # publish message to topic 'world'
                   # async always needs `send_multipart()`
                   await self._pub.send_multipart( [ b'world',
                                                       bytes( self._sMsg )
                                                       ]
                                                  );          print( "INF: await .send_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[B]: {0:}".format( zmq.ZMQError() ) )
                   # slow down message publication
                   await asyncio.sleep( 1 );                  print( "NOP: await .sleep( 1 )"         if ( zmq.ZMQError() == 0 ) else "ERR[C]: {0:}".format( zmq.ZMQError() ) )
          except:
              pass;                                           print( "EXC: thrown on PUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[D]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._pub.close();                              print( "FIN: PUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[E]: {0:}".format( zmq.ZMQError() ) )

    ################################################################
    #
    # A       SUB-RECEIVER ---------------------------------
    async def sub_hello_world( self ):

          self._sObj = SubHelloWorld();                       print( "INF: sObj set on SUB-side"      if (  None                 # instance-fuse(d)
                                                                                                         == self._sObj.msg_receive("?")
                                                                                                            )                    else "ERR[F]: {0:}".format( "?"            ) )
          try:
               while True:                                   # keep listening to all published message on topic 'world'
                     pass;                                    print( "INF: await .recv_multipart() about to be called now:" )
                     self._rMsg = await self._sub.recv_multipart()
                     pass;                                    print( "INF: await .recv_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[G]: {0:}".format( zmq.ZMQError() ) )
                     pass;                                    print( 'ACK: received: ', self._rMsg )
                     self._sObj.msg_receive( self._rMsg );    print( 'ACK: .msg_receive()-printed.' )
          except:
              pass;                                           print( "EXC: thrown on SUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[H]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._sub.close();                              print( "FIN: SUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[I]: {0:}".format( zmq.ZMQError() ) )

    # ---------close()---------------------------------------
    def try_to_close( self, aSocketINSTANCE ):

        try:
            aSocketINSTANCE.close();

        except:
            pass;

        return

2) Best used using a with HelloworldMessage() as ... : context-manager



回答2:

There were 2 errors with my code:

  1. As mentioned by @user3666197, the PUB/SUB communication archetype needs some time for initialization (see his/her answer). I had to move await asyncio.sleep(1) above the code of publishing (await pub.send_multipart([b'world', msg.encode('ascii')]))
  2. I encoded the message wrong. bytes(msg) --> msg.encode('ascii')

This answer is most closely related to my question, but please look at @user3666197 for certain design choices when implementing PyZMQ.

Advice

It seems that PyZMQ in an asyncio.get_event_loop() doesn't give an error traceback, therefore, wrap your code in a try & except block, e.g.:

import traceback
import logging

try:
    while True:
        msg_received = await sub.recv_multipart()
        # do other stuff

except Exception as e:
    print("Error with sub world")
    logging.error(traceback.format_exc())