I need to send a dictionary as the message from a publisher to subscribers. With the REQ/REP pattern send_json and recv_json work nicely, but I can't seem to find an incantation that works for PUB/SUB. Hope it's not the case that PUB/SUB can only work with send() and recv().
Here's the listing for the experiment I put together:
"""
Experiments with 0MQ PUB/SUB pattern
"""
import os
import sys
import time
import zmq
from multiprocessing import Process
from random import sample, choice
import signal
def handler(signum, frame):
""" Handler for SIGTERM """
# kill the processes we've launched
try:
for name, proc in _procd.iteritems():
if proc and proc.is_alive():
proc.terminate()
finally:
os._exit(0)
signal.signal(signal.SIGTERM, handler)
PORT = 5566
TOPICS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
def publisher():
""" Randomly update and publish topics """
context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.bind("tcp://*:{}".format(PORT))
## Init a dict of topic:value pairs
alltopics = dict()
for char in TOPICS:
alltopics[char] = time.time()
while True:
topic = choice(TOPICS)
alltopics[topic] = time.time()
## THIS IS SENDING
sock.send_json((topic, alltopics))
print "Sent topic {}".format(topic)
time.sleep(1)
def client(number, topics):
"""
Subscribe to list of topics and wait for messages.
"""
context = zmq.Context()
sock = context.socket(zmq.SUB)
sock.connect("tcp://localhost:{}".format(PORT))
for topic in topics:
sock.setsockopt(zmq.SUBSCRIBE, topic)
print "subscribed to topics {}".format(topics)
while True:
## THIS NEVER RETURNS
print sock.recv_json()
## ALREADY TRIED THIS. DOES NOT WORK
#topic, msg = sock.recv_json()
#print "Client{} {}:{}".format(number, topic, msg[topic])
sys.stdout.flush()
if __name__ == '__main__':
_procd = dict()
## Launch publisher
name = 'publisher'
_procd[name] = Process(target=publisher, name=name)
_procd[name].start()
## Launch the subscribers
for n in range(10):
name = 'client{}'.format(n)
_procd[name] = Process(target=client,
name=name,
args=(n, sample(TOPICS,3)))
_procd[name].start()
## Sleep until killed
while True:
time.sleep(1)
And here is the output up to the point where I kill the parent process
$ python pubsub.py
Sent topic Y
subscribed to topics ['B', 'Q', 'F']
subscribed to topics ['N', 'E', 'O']
subscribed to topics ['Y', 'G', 'M']
subscribed to topics ['G', 'D', 'I']
subscribed to topics ['D', 'Y', 'W']
subscribed to topics ['A', 'N', 'W']
subscribed to topics ['F', 'K', 'V']
subscribed to topics ['A', 'Q', 'X']
subscribed to topics ['S', 'Y', 'V']
subscribed to topics ['E', 'S', 'D']
Sent topic I
Sent topic N
Sent topic J
Sent topic I
Sent topic A
Sent topic T
Sent topic A
Sent topic K
Sent topic V
Sent topic E
The subscriptions and sending seems ok but the clients never print anything. The tracebacks for the client processes show that them hanging on the sock.recv_json() call. My first attempt is commented out. It also hangs.
I'd still like to see it work with send_json() and recv_json() but, per Jason's suggestion, the following is working:
with this in publisher()
and this in client()
Here's the complete listing followed by some sample output:
Sample Output
I believe what you're after is
send_multipart()
, rather thansend_json()
. In ZeroMQ parlance, a message with multiple frames is a multi part message, and the first frame is what determines the "subscriber topic" for pub/sub. From the examples that I've seen, it will also divine a topic from the beginning of a string if you only send a single frame.