I want to limit the amount of memory consumed by my ZeroMQ message queues in a Python application. I know that setting the high-water mark will limit the amount that will be queued on the sender side, but is there a way to control how much will be queued on the receiver side? The Python ZeroMQ binding seems to have it set at unlimited.
My test scenario: I have two python terminals that I am using for testing. One is the receiver:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context = zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.bind("tcp://127.0.0.1:12345")
The other is the sender:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PUSH)
>>> socket.setsockopt(zmq.SNDBUF, 2048)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.connect("tcp://127.0.0.1:12345")
>>> num = 0
>>> while True:
... print num
... socket.send(str(num))
... num = num + 1
...
I run socket.recv()
on the receiver side a couple times to make sure that the queue works, but other than that, let the two terminals just sit there. The send loop seems to never block and the receive prompt seems to have a growing memory footprint.
In contradiction to the documentation of ZeroMQ, the high water mark needs to be set on both the PUSH
side and the PULL
side. Once I changed the PULL
, it worked better. The new PULL
code is:
Python 2.5.1 (r251:54863, Aug 25 2008, 20:50:04)
[GCC 4.1.2 20071124 (Red Hat 4.1.2-42)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> context=zmq.Context()
>>> socket = context.socket(zmq.PULL)
>>> socket.setsockopt(zmq.RCVBUF, 256)
>>> socket.setsockopt(zmq.HWM, 1)
>>> socket.bind("tcp://127.0.0.1:12345")
Actually, the documentation says this:
"When a ZMQ_PUSH socket enters an exceptional state due to having
reached the high water mark for all downstream nodes, or if there are
no downstream nodes at all, then any zmq_send(3) operations on the
socket shall block until the exceptional state ends or at least one
downstream node becomes available for sending; messages are not
discarded."
http://api.zeromq.org/2-1:zmq-socket
Which outright states that you can (and should) set the high water mark for downstream nodes (aka pull), and perhaps implies that setting it on the push side will have no effect (although I suspect that's not true, because there is still the case where downstream nodes are available but messages are coming in faster than they can be sent.)
With the zmq.SNDBUF
and zmq.RCVBUF
options you could set a limit on buffer size.
Also, I using zmq.CONFLATE
option in reciever side to limiting the ZeroMQ queue size to one:
Here's an example with ZMQ PUSH/PULL
:
Sender (zmq.PUSH
):
def create_pub_socket(ip, port):
try:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 1)
zmq_address = "tcp://{}:{}".format(ip, port)
socket.connect(zmq_address)
return socket
except zmq.ZMQError as exp:
print(exp)
return False
sock = create_push_socket('127.0.0.1', 5558)
if sock:
sock.send_json({'a': 1})
Getter (zmq.PULL
):
def listen(self):
sock = None
try:
context = zmq.Context()
sock = context.socket(zmq.PULL)
sock.setsockopt(zmq.RCVHWM, 1)
sock.setsockopt(zmq.CONFLATE, 1) # last msg only.
sock.bind("tcp://*:5558")
except zmq.ZMQError:
logger.captureException()
configs = None
while configs is None:
if sock:
configs = sock.recv_json()
time.sleep(1e-1)
else:
time.sleep(5)
listen() # Recursive.
listen()