How do I clear the buffer upon start/exit in ZMQ s

2019-05-08 06:49发布

I am using a REQ/REP type socket for ZMQ communication in python. There are multiple clients that attempt to connect to one server. Timeouts have been added in the client script to prevent indefinite wait.

The problem is that when the server is not running, and a client attempts to establish connection, it's message gets added to the queue buffer, which should not even exist at this moment ideally. When the script starts running and a new client connects, the previous client's data is taken in first by the server. This should not happen.

When the server starts, it assumes a client is connected to it since it had tried to connect previously, and could not exit cleanly (since the server was down).

In the code below, when the client tries the first time, it gets ERR 03: Server down which is correct, followed by Error disconnecting. When server is up, I get ERR 02: Server Busy for the first client which connects. This should not occur. The client should be able to seamlessly connect with the server now that it's up and running.

Server Code:

import zmq

def server_fn():

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://192.168.1.14:5555")
one=1
while one == 1:
    message = socket.recv()
    #start process if valid new connection
    if message == 'hello':
        socket.send(message) #ACK
        #keep session alive until application ends it.
        while one == 1:
            message = socket.recv()
            print("Received request: ", message)
            #exit connection
            if message == 'bye':
                socket.send(message)
                break
            #don't allow any client to connect if already busy
            if message == 'hello':
                socket.send ('ERR 00')
                continue
            #do all data communication here
    else:
        socket.send('ERR 01: Connection Error')
return

server_fn() 

Client Code:

import zmq 

class client:
    def clientInit(self):
        hello='hello'
        #zmq connection
        self.context = zmq.Context()
        print("Connecting to hello world server...")
        self.socket = self.context.socket(zmq.REQ)
        self.socket.connect("tcp://192.168.1.14:5555")
        #RCVTIMEO to prevent forever block 
        self.socket.setsockopt(zmq.RCVTIMEO, 5000)   
        #SNDTIME0 is needed since script may not up up yet
        self.socket.setsockopt(zmq.SNDTIMEO, 5000)   
        try:
            self.socket.send(hello)
        except:
            print "Sending hello failed."
        try:        
            echo = self.socket.recv() 
            if hello == echo:
                #connection established.
                commStatus = 'SUCCESS'
            elif echo == 'ERR 00':
                #connection busy
                commStatus = "ERR 00. Server busy."            
            else:
                #connection failed
                commStatus="ERR 02"            
        except:
            commStatus = "ERR 03. Server down."
        return commStatus   

    def clientQuit(self):
            try:
                self.socket.send('bye')
                self.socket.recv()              
            except:
                print "Error disconnecting."    

cObj = client()            
commStatus=cObj.clientInit()
print commStatus
cObj.clientQuit()

PS - I have a feeling the solution may lie in the correct usage of socket.bind and socket.connect.

1条回答
孤傲高冷的网名
2楼-- · 2019-05-08 07:23

Answering my own question-

The problem is that the first client sends a message which the server accepts when it starts running, regardless of the status of the client.

To prevent this, 2 things have to be done. The most important thing is to use socket.close() to close the client connection. Secondly, the LINGER parameter can be set to a low value or zero. This clears the buffer after the timeout value from the time the socket is closed.

class client:
    def clientInit(self):
...
            self.socket.setsockopt(zmq.LINGER, 100)   
...

def clientQuit(self):
        try:
            self.socket.send('bye')
            self.socket.recv() 
        except:
            print "Error disconnecting."    
        self.socket.close() 
查看更多
登录 后发表回答