I noticed that a zeromq PUB socket will buffers all outgoing data if it is connecting, for example
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print sub.recv()
The sub binds after those messages, they should be dropped, because PUB should drop messages if no one connected to it. But instead of dropping messages, it buffers all messages.
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi
As you can see, those "a message should not be dropped" are buffered by the socket, once it gets connected, it flush them to SUB socket. If I bind at the PUB socket, and connect at the SUB socket, then it works correctly.
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print repr(sub.recv())
And you can only see the output
'hi'
This kind of strange behavior cause a problem, it buffers all data on a connecting socket, I have two servers, server A publishes data to server B
Server A -- publish --> Server B
It works fine if server B gets online. But what if I start the Server A and do not start Server B?
As the result, the connecting PUB socket on Server A keeps all those data, the memory usage gets higher and higher.
Here is the problem, is this kind of behavior a bug or feature? If it is feature, where can I find a document that mentions this behavior? And how can I stop the connecting PUB socket buffers all data?
Thanks.
Whether the socket blocks or drops messages depends on the socket type as described in the ZMQ::Socket documentation (emphasis below is mine):
ZMQ::HWM: Retrieve high water mark
The ZMQ::HWM option shall retrieve the high water mark for the
specified socket. The high water mark is a hard limit on the maximum
number of outstanding messages 0MQ shall queue in memory for any
single peer that the specified socket is communicating with.
If this limit has been reached the socket shall enter an exceptional
state and depending on the socket type, 0MQ shall take appropriate
action such as blocking or dropping sent messages. Refer to the
individual socket descriptions in ZMQ::Socket for details on the exact
action taken for each socket type.
The default ZMQ::HWM value of zero means “no limit”.
You can see if it will block or drop by looking through the documentation for the socket type for ZMQ::HWM option action
which will either be Block
or Drop
.
The action for ZMQ::PUB
is Drop
, so if it is not dropping you should check the HWM (High Water Mark) value and heed the warning that The default ZMQ::HWM value of zero means “no limit”, meaning that it will not enter an exceptional state until the system runs out of memory (at which point I don't know how it behaves).
I feel this behavior is the semantic of zmq_connect().
That is: when zmq_connect() returns success, then the connection is conceptually established, and thus your connecting-PUB starts queuing message instead of dropping.
Following excerpt from "ZMQ Guide" is a hint for this:
In theory with ØMQ sockets, it does not matter which end connects, and
which end binds. However with PUB-SUB sockets, if you bind the SUB
socket and connect the PUB socket, the SUB socket may receive old
messages, i.e. messages sent before the SUB started up. This is an
artifact of the way bind/connect works. It's best to bind the PUB and
connect the SUB, if you can.
Following section in zmq_connect() has some hints, shown below:
Key differences to conventional sockets
Generally speaking, conventional sockets present a synchronous
interface to either connection-oriented reliable byte streams
(SOCK_STREAM), or connection-less unreliable datagrams (SOCK_DGRAM).
In comparison, ØMQ sockets present an abstraction of an asynchronous
message queue, with the exact queueing semantics depending on the
socket type in use. Where conventional sockets transfer streams of
bytes or discrete datagrams, ØMQ sockets transfer discrete messages.
ØMQ sockets being asynchronous means that the timings of the physical
connection setup and tear down, reconnect and effective delivery are
transparent to the user and organized by ØMQ itself. Further, messages
may be queued in the event that a peer is unavailable to receive them.
They setting HWM option on the socket.
So bind() and connect() result in two different behaviors. Why don't you just choose which one you prefer (it seems like bind()) and use that?
It is indeed a feature of ZeroMQ in general that it buffers outgoing messages until a connection is made.
You should be able to set a high water mark in the socket using the hwm settingom the pub socket. It lets you define how many messages are kept.
Here's a hack that might help...
Set your ZMQ::HWM
to a fixed number, say 10. Upon connection, call the subscriber socket's recv
method in a loop until it discards all the buffered messages, and only THEN start your main receiving loop.