I get this strange deadlock when I try to synchronize two python3 scripts using 0mq (ZeroMQ
). The scripts run fine for several thousand iterations, but sooner or later they both stop and wait for each other. I am running both scripts from different CMD-Windows on Windows 7.
I cannot figure out
why such a deadlock is even possible.
What can go wrong here?
Script A:
while (1):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10001')
msg = socket.recv() # Waiting for script B to send done
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10002')
socket.send_string("done") # Tell script B we are done
Script B
while (1):
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10001')
socket.send_string("done") # Tell script A we are done
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10002')
msg = socket.recv() # Waiting for script A to send done
This is not a DeadLock case
The code, sure, still needs some care.
Disambiguation: your scenario does not hit into a resources mutual locking state, aka a DeadLock. Yes, sure, your code crashes, but most probably not due to a REQ/REP
DeadLock ( where it might and does appear on a lossy network tcp:
transport-class ). The posted code is crashing due to unmanaged resource handling, not due to reaching a mutual-blocking state of a DeadLock / LiveLock.
How to fix it?
First, let's assume your ultra-low latency-motivated system does not allow to repetitively instantiate anything. There are exceptions to this, but let's be profi.
move your .Context()
resource setup ( or inheritance from an outer call ) out of the loop
review, whether you need and your latency constraints allow you to setup / tear-down a .socket()
resource twice in each loop-run.
decide, whether you can live with real REQ/REP
deadlock once a first message gets lost in the transport-path
enforce graceful resources-use termination ( .socket()
-s, O/S port#
s, .Context()
-s ). Do not let them hanging unterminated forever, while creating infinite amount of others instead, that devastates any "fault-resilient" system. Resources are never infinite.
design both signalling and transmission behaviours in a non-blocking manner. This allows you to detect and handle remote-process timeouts and introduce a chance for local remedy / responsive actions.
redesign the code to a level of secure code you need ( the below example works a few years in a soft-realtime controlled endless loop 24/7/365 in a distributed processing framework with a remote keyboard and some other local- and remote-diagnostic tools ).
What is missing for production-grade code?
Your code has to "envisage" what might have gone wrong, in any part of your distributed system. Yes, it is hard, but necessary. Your remote node -- a communicating counterparty -- stopped responding, lost a message, went rebooted, stalled due to O/S crash, whatever imaginable ( plus a few rather nasty surprised you will find only on-the-fly ... ). This is another Pandora's Box to cover in this small post, which does not mean it is not necessary. It is your life-saving vest.
Design in a non-blocking manner wherever you can, this way you remain in control of events ...
Anyways, always release system resources and .term()
all ZeroMQ .Context()
instances in a graceful manner -- "tidy up" is a fair practice -- both in real life and the more in the code-empires.
# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
#NONSTOP RESPONDER RAW EXAMPLE:
def aMiniRESPONDER( aTarget2Bind2_URL = "tcp://A.B.C.D:8889",
anExternalPREDICTOR = None,
anExternallyManagedZmqCONTEXT = None,
aSpreadMinSafetyMUL = 3.0,
aSilentMODE = True
):
try: # RESOURCES LAYER
# ... SETUP
# ------------------------------------------------- .Context()
# can setup a locally-managed context or re-use
# anExternallyManagedZmqCONTEXT obtained upon a func Call
aZmqCONTEXT = anExternallyManagedZmqCONTEXT or zmq.Context( 1 )
# localhost:8887 [REP] ... remote [REQ] peer .connect() + .send()
aCtrlPORT_URL = "tcp://*:8887"
# localhost:8890 [PUB] ... remote [SUB] peers .connect() +
# .subscribe + .recv( zmq.NOBLOCK ) ( MQL4 cannot .poll() so far ...)
aSIGsPORT_URL = "tcp://*:8890"
aXmitPORT_URL = aTarget2Bind2_URL
aListOfSOCKETs = []
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: XmitPORT
aXmitSOCKET = aZmqCONTEXT.socket( zmq.PAIR )
# XmitPORT
aXmitSOCKET.bind( aXmitPORT_URL )
aListOfSOCKETs.append( aXmitSOCKET )
except:
# EXC: XmitPORT on Failure: GRACEFUL CLEARING XmitPORT
msg = "\nEXC. ZmqError({0:s}) on aXmitSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aTarget2Bind2_URL )
raise ValueError( "ZMQ_EXC_EXIT @ XmitPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: CtrlPORT
# CtrlSOCKET [REP] .recv()s<--[REQ] + .send()s--> [REQ]
aCtrlSOCKET = aZmqCONTEXT.socket( zmq.REP )
# CtrlPORT <-REQ/REP means a remote peer [REQ] has to
# .send()+.recv() before sending another CtrlCMD
aCtrlSOCKET.bind( aCtrlPORT_URL )
aListOfSOCKETs.append( aCtrlSOCKET )
except:
# EXC: CtrlPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT
msg = "\nEXC. ZmqError({0:s}) on aCtrlSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aCtrlPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ CtrlPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: SIGsPORT
# SIGsPORT [PUB] .send()s--> [SUB]s
aSIGsSOCKET= aZmqCONTEXT.socket( zmq.PUB )
# SIGsPORT --> PUB/SUB means a remote peer(s) [SUB] .subscribe() + .recv()
aSIGsSOCKET.bind( aSIGsPORT_URL )
aListOfSOCKETs.append( aSIGsSOCKET )
except:
# EXC: SIGsPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT and SIGsPORT
msg = "\nEXC. ZmqError({0:s}) on aSIGsSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aSIGsPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ SIGsPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# ... SETUP YOUR APPLICATION CODE
try: # APP LAYER ___________________________________________
# what you want to do
# here you go ...
except: # APP LAYER ___________________________________________
# handle EXCs
finally: # APP LAYER ___________________________________________
# your own application post-mortem / pre-exit code
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
except: # RESOURCES LAYER .............................................
# ... code shall handle it's own exceptions + externally caused events
finally: # RESOURCES LAYER .............................................
# ... always, ALWAYS gracefully exit ( avoid leakages and dirty things )
[ allSOCKETs.setsockopt( zmq.LINGER, 0 ) for allSOCKETs in aListOfSOCKETs ]
[ allSOCKETs.close( ) for allSOCKETs in aListOfSOCKETs ]
# --------------------------------------------------------------#
# RESOURCES dismantled, may .term()
# .TERM(), NOP otherwise
if not ( aZmqCONTEXT is anExternallyManagedZmqCONTEXT ): #
aZmqCONTEXT.term() #
return
You shall handle context
and socket
creation only once, not with each iteration.
Also, you shall reuse the context (unless you are going to use it from another
thread in your code).
try:
context = zmq.Context()
rep_sck = context.socket(zmq.REP)
rep_sck.bind('tcp://127.0.0.1:10001')
rq_sck = context.socket(zmq.REQ)
rq_sck.connect('tcp://127.0.0.1:10002')
while (1):
msg = rep_sck.recv() # Waiting for script B to send done
do something useful (takes only a few millisecs)
rq_sck.send_string("done") # Tell script B we are done
finally:
rep_sck.close()
rq_sck.close()
The same applies to the 2nd script.
try:
context = zmq.Context()
rq_sck = context.socket(zmq.REQ)
rq_sck.connect('tcp://127.0.0.1:10001')
rep_sck = context.socket(zmq.REP)
rep_sck.bind('tcp://127.0.0.1:10002')
while (1):
do something useful (takes only a few millisecs)
rq_sck.send_string("done") # Tell script A we are done
msg = rep_sck.recv() # Waiting for script A to send done
finally:
rq_sck.close()
rep_sck.close()
EDIT: Updated code to call Socket.close()
Since pyzmq version 14.3.0 the Socket.close()
and Context.term()
are not called automatically
during garbage collection, proper closing of sockets was added.