ZeroMQ REQ/REP on ipc:// and concurrency

2019-07-21 01:56发布

I implemented a JSON-RPC server using a REQ/REP 0MQ ipc:// socket and I'm experiencing strange behavior which I suspect is due to the fact that the ipc:// underlying unix socket is not a real socket, but rather a single pipe.

From the documentation, one has to enforce strict zmq_send()/zmq_recv() alternation, otherwise the out-of-order zmq_send() will return an error.

However, I expected the enforcement to be per-client, not per-socket. Of course with a Unix socket there is just one pipeline from multiple clients to the server, so the server won't know who it is talking with. Two clients could zmq_send() simultaneously and the server would see this as an alternation violation.

The sequence could be:

  • ClientA: zmq_send()
  • ClientB: zmq_send() : will it block until the other send/receive completes? will it return -1? (I suspect it will with ipc:// due to inherent low-level problems, but with TCP it could distinguish the two clients)
  • ClientA: zmq_recv()
  • ClientB: zmq_recv()

so what about tcp:// sockets? Will it work concurrently? Should I use some other locking mechanism to work around this?

Example server:

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

#include <czmq.h>


int main(void) 
{
    zctx_t *zctx   ;
    void *zsocket_rpc;

        printf ("rpcserver create context\n");
    zctx = zctx_new();
        printf ("rpcserver create socket\n");
    zsocket_rpc = zsocket_new (zctx, ZMQ_REP);
        if (!zsocket_rpc) {
                fprintf (stderr, "zsocket_rpc is NULL\n");

                exit(1);
        }
    zsocket_bind (zsocket_rpc, "ipc:///tmp/rpcserver");

        for(;;) {
                int rc;

                char *msg = zstr_recv(zsocket_rpc);
                printf ("rpcserver received %s\n", msg);

                printf ("rpcserver sleep\n");
                usleep(200000);

                printf ("rpcserver send %s\n", msg);
                rc = zstr_send(zsocket_rpc, msg);
                if (rc < 0) {
                        fprintf (stderr, "rpcserver zstr_send returned %d\n", rc);
                        continue;
                }
                free(msg);
        }

}

Example client (launch as ./rpcclient letter ) :

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <czmq.h>

int main(int argc, char *argv[]) 
{
        char msg[] = "A:MESSAGE 999";
    zctx_t *zctx;
    void *zsocket_rpc;

        if (argc != 2) {
                fprintf (stderr, "Usage: rpcclient letter\n");
                exit(1);
        }

    zctx = zctx_new();
        printf ("rpcclient new socket\n");
    zsocket_rpc = zsocket_new (zctx, ZMQ_REQ);
        if (!zsocket_rpc) {
                fprintf (stderr, "zsocket_rpc is NULL\n");

                exit(1);
        }
        printf ("rpcclient connect\n");
    zsocket_connect (zsocket_rpc, "ipc:///tmp/rpcserver");

        for (int cnt = 0; cnt < 1000; cnt++) {
                int rc;

                sprintf (msg, "%c:MESSAGE %03d", argv[1][0], cnt);
                printf  ("rpcclient send %s\n", msg);
                rc = zstr_send(zsocket_rpc, msg);
                if (rc < 0) {
                        fprintf (stderr, "rpcclient zstr_send returned %d\n", rc);
                        continue;
                }
                printf ("rpcclient sleep...\n");
                usleep(200000);

                char *reply = zstr_recv(zsocket_rpc);
                printf  ("rpcclient recv %s\n", reply);

                free(reply);
        }

}

2条回答
姐就是有狂的资本
2楼-- · 2019-07-21 02:51

Thank you Pieter. I apologize, the extensive tests I made today confirmed that the REQ/REPLY is rock solid even with multiple clients also on ipc:// I consider the question closed.

(the problem was due to multiple threads calling RPC functions without locking... as usual)

查看更多
Animai°情兽
3楼-- · 2019-07-21 02:53

You have to explain what sequence of events fails to do what you want. This way it's not obvious what your question is about.

查看更多
登录 后发表回答