How to synchronize the publishers and subscribers

2020-07-24 05:16发布

Extended PUB/SUB topology enter image description here

I have multiple publishers and multiple subscribers in a use case with 1 intermediary.

In the ZeroMQ guide, I learnt about synchronizing 1 publisher and 1 subscriber, using additional REQ/REP sockets. I tried to write a synchronization code for my use case, but it is getting messy if I try to write code according to logic given for 1-1 PUB/SUB.

The publisher code when we have only 1 publisher is :

//Socket to receive sync request
zmq::socket_t syncservice (context, ZMQ_REP);
syncservice.bind("tcp://*:5562");

//  Get synchronization from subscribers
int subscribers = 0;
while (subscribers < SUBSCRIBERS_EXPECTED) {

    //  - wait for synchronization request
    s_recv (syncservice);

    //  - send synchronization reply
    s_send (syncservice, "");

    subscribers++;
}

The subscriber code when we have only 1 subscriber is:

zmq::socket_t syncclient (context, ZMQ_REQ);
syncclient.connect("tcp://localhost:5562");

//  - send a synchronization request
s_send (syncclient, "");

//  - wait for synchronization reply
s_recv (syncclient);

Now, when I have multiple subscribers, then does each subscriber need to send a request to every publisher?

The publishers in my use case come and go. Their number is not fixed.

So, a subscriber won't have any knowledge about how many nodes to connect to and which publishers are present or not.

Please suggest a logic to synchronize an extended PUB/SUB code

1条回答
Root(大扎)
2楼-- · 2020-07-24 05:31

Given the XPUB/XSUB mediator node is present,

the actual PUB-node discovery may be completely effort-less for the XSUB-mediator-side ( actually principally avoided as such ).

Just use the reversed the XSUB.bind()-s / PUB.connect()-s and the problem ceased to exist at all.

Smart, isn't it?

PUB-nodes may come and go, yet the XSUB-side of the Policy-mediator node need not bother ( except for a few initial .setsockopt( { LINGER, IMMEDIATE, CONFLATE, RCVHWM, MAXSIZE } ) performance tuning and robustness increasing settings ), enjoying the still valid and working composition of the actual Topic-filter(s) .setsockopt( zmq.SUBSCRIBE, ** ) settings in-service and may centrally maintain such composition remaining principally agnostic about the state/dynamic of the semi-temporal group of the now / later .connect()-ed live / dysfunctional PUB-side Agent-nodes.

Even better, isn't it?

查看更多
登录 后发表回答