I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:
Edit 1: Minimized example
Edit 2: Included solution, see answer down for how.
import asyncio
import zmq.asyncio
from zmq.asyncio import Context
# manages message flow between publishers and subscribers
class HelloWorldMessage:
def __init__(self, url='127.0.0.1', port='5555'):
self.url = "tcp://{}:{}".format(url, port)
self.ctx = Context.instance()
# activate publishers / subscribers
asyncio.get_event_loop().run_until_complete(asyncio.wait([
self.pub_hello_world(),
self.sub_hello_world(),
]))
# generates message "Hello World" and publish to topic 'world'
async def pub_hello_world(self):
pub = self.ctx.socket(zmq.PUB)
pub.connect(self.url)
# message contents
msg = "Hello World"
print(msg)
# keep sending messages
while True:
# --MOVED-- slow down message publication
await asyncio.sleep(1)
# publish message to topic 'world'
# async always needs `send_multipart()`
await pub.send_multipart([b'world', msg.encode('ascii')]) # WRONG: bytes(msg)
# processes message "Hello World" from topic 'world'
async def sub_hello_world(self):
sub = self.ctx.socket(zmq.SUB)
sub.bind(self.url)
sub.setsockopt(zmq.SUBSCRIBE, b'world')
# keep listening to all published message on topic 'world'
while True:
msg = await sub.recv_multipart()
# ERROR: WAITS FOREVER
print('received: ', msg)
if __name__ == '__main__':
HelloWorldMessage()
Problem
With the above code only 1 Hello World
is printed and then waits forever. If I press ctrl+c, I get the following error:
python helloworld_pubsub.py
Hello World
^CTraceback (most recent call last):
File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
HelloWorldMessage()
File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
self.sub_hello_world(),
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
self._run_once()
File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
event_list = self._selector.select(timeout)
File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Versions: libzmq: 4.2.3
, pyzmq: 17.0.0
, Ubuntu 16.04
Any insights are appreciated.
De-coupling for an OOP separation of concerns is fine, yet
let's also spend some care on debugging the code:
1) ZeroMQ PUB/SUB
Scalable Formal Communication Archetype is known for years to require some time before PUB/SUB
-s get indeed ready so as to broadcast / accept messages. Thus one ought prefer to setup the infrastructure best inside the .__init__()
and not right before SUB
-s are supposed to already receive some payload(s)
In my view, this would be a safer design approach:
class HelloWorldMessage:
""" __doc__
[DEF-ME]
[DOC-ME]
USAGE: with HelloWorldMessage() as aContextManagerFUSEd_class_INSTANCE:
# may use aContextManagerFUSEd_class_INSTANCE
# and shall safely
# gracefully terminate locally spawned ZeroMQ resources
PARAMETERS:
RETURNS:
THROWS:
EXAMPLE:
REF.s:
[TEST-ME]
[PERF-ME]
[PUB-ME]
"""
def __init__( self, url = '127.0.0.1',
port = '5555'
):
self._url = "tcp://{}:{}".format( url, port )
#---------------------------------------------------- CONTEXT:
self._ctx = Context.instance(); print( "INF: zmq.asyncio.Context() set" if ( zmq.ZMQError() == 0 ) else "ERR[1]: {0:}".format( zmq.ZMQError() ) )
#---------------------------------------------------- SUB:
self._sub = self._ctx.socket(zmq.SUB ); print( "INF: zmq.SUB set" if ( zmq.ZMQError() == 0 ) else "ERR[2]: {0:}".format( zmq.ZMQError() ) )
self._sub.bind( self._url ); print( "INF: zmq.SUB.bind() done" if ( zmq.ZMQError() == 0 ) else "ERR[3]: {0:}".format( zmq.ZMQError() ) )
self._sub.setsockopt( zmq.LINGER, 1 ); print( "INF: zmq.SUB LINGER set" if ( zmq.ZMQError() == 0 ) else "ERR[4]: {0:}".format( zmq.ZMQError() ) )
self._sub.setsockopt( zmq.SUBSCRIBE, b'world');print( "INF: zmq.SUB subscribed" if ( zmq.ZMQError() == 0 ) else "ERR[5]: {0:}".format( zmq.ZMQError() ) )
#---------------------------------------------------- PUB:
self._pub = self._ctx.socket(zmq.PUB ); print( "INF: zmq.PUB set" if ( zmq.ZMQError() == 0 ) else "ERR[6]: {0:}".format( zmq.ZMQError() ) )
self._pub.setsockopt( zmq.LINGER, 1 ); print( "INF: zmq.PUB LINGER set" if ( zmq.ZMQError() == 0 ) else "ERR[7]: {0:}".format( zmq.ZMQError() ) )
self._pub.connect( self._url ); print( "INF: zmq.PUB.connect() done" if ( zmq.ZMQError() == 0 ) else "ERR[8]: {0:}".format( zmq.ZMQError() ) )
#----------------------------------------------------
...
def __enter__( self ):
#---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __enter__()-auto-METHOD
return self
def __exit__( self, exc_type, exc_value, traceback ):
#---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __exit__()-auto-METHOD
self.try_to_close( self._pub );
self.try_to_close( self._sub );
pass; self._ctx.term()
return
################################################################
#
# A PUB-SENDER ------------------------------------
async def pub_hello_world( self ):
self._pObj = PubHelloWorld(); print( "INF: pObj set on PUB-side" if ( self._pObj.msg_pub() # instance-fuse(d)
== "Hello World" ) else "ERR[9]: {0:}".format( "Hello World" ) )
try:
while True: # keep sending messages
self._sMsg = self._pObj.msg_pub(); print( "INF: pObj.msg_pub() called" if ( self._sMsg != None ) else "ERR[A]: {0:}".format( "msg == ?" ) )
pass; print( self._sMsg )
# publish message to topic 'world'
# async always needs `send_multipart()`
await self._pub.send_multipart( [ b'world',
bytes( self._sMsg )
]
); print( "INF: await .send_multipart()" if ( zmq.ZMQError() == 0 ) else "ERR[B]: {0:}".format( zmq.ZMQError() ) )
# slow down message publication
await asyncio.sleep( 1 ); print( "NOP: await .sleep( 1 )" if ( zmq.ZMQError() == 0 ) else "ERR[C]: {0:}".format( zmq.ZMQError() ) )
except:
pass; print( "EXC: thrown on PUB side" if ( zmq.ZMQError() == 0 ) else "ERR[D]: {0:}".format( zmq.ZMQError() ) )
finally:
self._pub.close(); print( "FIN: PUB.close()-d" if ( zmq.ZMQError() == 0 ) else "ERR[E]: {0:}".format( zmq.ZMQError() ) )
################################################################
#
# A SUB-RECEIVER ---------------------------------
async def sub_hello_world( self ):
self._sObj = SubHelloWorld(); print( "INF: sObj set on SUB-side" if ( None # instance-fuse(d)
== self._sObj.msg_receive("?")
) else "ERR[F]: {0:}".format( "?" ) )
try:
while True: # keep listening to all published message on topic 'world'
pass; print( "INF: await .recv_multipart() about to be called now:" )
self._rMsg = await self._sub.recv_multipart()
pass; print( "INF: await .recv_multipart()" if ( zmq.ZMQError() == 0 ) else "ERR[G]: {0:}".format( zmq.ZMQError() ) )
pass; print( 'ACK: received: ', self._rMsg )
self._sObj.msg_receive( self._rMsg ); print( 'ACK: .msg_receive()-printed.' )
except:
pass; print( "EXC: thrown on SUB side" if ( zmq.ZMQError() == 0 ) else "ERR[H]: {0:}".format( zmq.ZMQError() ) )
finally:
self._sub.close(); print( "FIN: SUB.close()-d" if ( zmq.ZMQError() == 0 ) else "ERR[I]: {0:}".format( zmq.ZMQError() ) )
# ---------close()---------------------------------------
def try_to_close( self, aSocketINSTANCE ):
try:
aSocketINSTANCE.close();
except:
pass;
return
2) Best used using a with HelloworldMessage() as ... :
context-manager
There were 2 errors with my code:
- As mentioned by @user3666197, the
PUB/SUB
communication archetype needs some time for
initialization (see his/her answer). I had to move await asyncio.sleep(1)
above the code of publishing (await pub.send_multipart([b'world', msg.encode('ascii')])
)
- I encoded the message wrong.
bytes(msg)
--> msg.encode('ascii')
This answer is most closely related to my question, but please look at @user3666197 for certain design choices when implementing PyZMQ.
Advice
It seems that PyZMQ in an asyncio.get_event_loop()
doesn't give an error traceback, therefore, wrap your code in a try
& except
block, e.g.:
import traceback
import logging
try:
while True:
msg_received = await sub.recv_multipart()
# do other stuff
except Exception as e:
print("Error with sub world")
logging.error(traceback.format_exc())