How to have limited ZMQ (ZeroMQ - PyZMQ) queue buf

2019-05-30 06:41发布

I use pyzmq library with pub/sub in python.

I have some rapid ZMQ publisher by .connect() method script and a slower ZMQ subscriber by .bind() method script.

Then after few minutes my subscriber gets old published data from publishers (due ZMQ buffer).

My Question: Is there an approach to manage ZMQ queue buffer size? (set a limited buffer)

Note: I don't want to use ZMQ PUSH/PULL.
Note: I've read this post, but this approach clear buffer only: clear ZMQ buffer

Note: I tried with high water mark options too, but it didn't worked:

socket.setsockopt(zmq.RCVHWM, 10)  # not working
socket.setsockopt(zmq.SNDHWM, 10)  # not working

Publisher:

import zmq
import time

port = "5556"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10)  # not working

while True:
    data = time.time()
    print("%d" % data)
    socket.send("%d" % data)
    time.sleep(1)

Subscriber:

import zmq
import time

port = "5556"

context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10)  # not working

while 1:
    time.sleep(2)
    data = socket.recv()
    print data

Even with these options, queue size is more than 10 yet (with configured send/receive high water mark).

2条回答
一夜七次
2楼-- · 2019-05-30 07:03

To set the queue/buffer size you need to set the high water marks via the socket options

setsockopt(zmq.SNDHWM, 10)
setsockopt(zmq.RCVHWM, 10)
查看更多
Fickle 薄情
3楼-- · 2019-05-30 07:06

I found a manner to get "Last message only" option in ZMQ Subscribe socket (using CONFLATE option).

But first you should set the CONFLATE option before you connect:

import zmq
import time

port = "5556"

context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while 1:
    time.sleep(2)
    data = socket.recv()
    print data

On the other word, I removed any buffered queue in subscriber code.


[In Additional]:

With the zmq.SNDBUF and zmq.RCVBUF options we could set a limit on ZMQ buffer size. (More complete and an example)


查看更多
登录 后发表回答