I've got two Python programs running. Program A connects to program B with the multiprocessing module:
# Connection code in program A
# -----------------------------
import multiprocessing
import multiprocessing.connection
...
connection = multiprocessing.connection.Client(
('localhost', 19191), # <- address of program B
authkey='embeetle'.encode('utf-8') # <- authorization key
)
...
connection.send(send_data)
recv_data = connection.recv()
It works perfectly most of the time. However, sometimes program B is frozen (the details don't matter much, but it usually happens when the GUI from program B spawns a modal window).
While program B is frozen, program A hangs at the following line:
connection = multiprocessing.connection.Client(
('localhost', 19191), # <- address of program B
authkey='embeetle'.encode('utf-8') # <- authorization key
)
It keeps waiting for a response. I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..)
does not have one.
How can I implement a timeout here?
Notes:
I'm working on a Windows 10
computer with Python 3.7
.
I would like to put a timeout parameter, but the call to multiprocessing.connection.Client(..)
does not have one. How can I implement a timeout here?
Looking at the source to multiprocessing.connection in Python 3.7, the Client()
function is a fairly brief wrapper around SocketClient()
for your use case, which in turn wraps Connection()
.
At first it looked fairly straightforward to write a ClientWithTimeout
wrapper that does the same thing, but additionally calls settimeout()
on the socket it creates for the connection. However, this does not have the correct effect, because:
Python implements its own socket timeout behaviour by using select()
and an underlying non-blocking OS socket; this behaviour is what is configured by settimeout()
.
Connection
operates directly on an OS socket handle, which is returned by calling detach()
on the normal Python socket object.
Since Python has set the OS socket handle to the non-blocking mode, recv()
calls on it return immediately rather than waiting for the timeout period.
However, we can still set a receive timeout on the underlying OS socket handle by using the low-level SO_RCVTIMEO
socket option.
Hence the second version of my solution:
from multiprocessing.connection import Connection, answer_challenge, deliver_challenge
import socket, struct
def ClientWithTimeout(address, authkey, timeout):
with socket.socket(socket.AF_INET) as s:
s.setblocking(True)
s.connect(address)
# We'd like to call s.settimeout(timeout) here, but that won't work.
# Instead, prepare a C "struct timeval" to specify timeout. Note that
# these field sizes may differ by platform.
seconds = int(timeout)
microseconds = int((timeout - seconds) * 1e6)
timeval = struct.pack("@LL", seconds, microseconds)
# And then set the SO_RCVTIMEO (receive timeout) option with this.
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)
# Now create the connection as normal.
c = Connection(s.detach())
# The following code will now fail if a socket timeout occurs.
answer_challenge(c, authkey)
deliver_challenge(c, authkey)
return c
For brevity, I have assumed the parameters are as per your example, i.e.:
- address is a tuple (implying address family is
AF_INET
).
- authkey is a byte string.
If you need to handle cases where these assumptions don't hold then you will need to copy a little more logic from Client()
and SocketClient()
.
Although I looked at the multiprocessing.connection
source to find out how to do this, my solution does not use any private implementation details. Connection
, answer_challenge
and deliver_challenge
are all public and documented parts of the API. This function should therefore be be safe to use with future versions of multiprocessing.connection
.
Note that SO_RCVTIMEO
may not be supported on all platforms, but it is present on at least Windows, Linux and OSX. The format of struct timeval
is also platform-specific. I have assumed that the two fields are always of the native unsigned long
type. I think this should be correct on common platforms but it is not guaranteed to always be so. Unfortunately Python does not currently provide a platform-independent way to do this.
Below is a test program which shows this working - it assumes the above code is saved as client_timeout.py
.
from multiprocessing.connection import Client, Listener
from client_timeout import ClientWithTimeout
from threading import Thread
from time import time, sleep
addr = ('localhost', 19191)
key = 'embeetle'.encode('utf-8')
# Provide a listener which either does or doesn't accept connections.
class ListenerThread(Thread):
def __init__(self, accept):
Thread.__init__(self)
self.accept = accept
def __enter__(self):
if self.accept:
print("Starting listener, accepting connections")
else:
print("Starting listener, not accepting connections")
self.active = True
self.start()
sleep(0.1)
def run(self):
listener = Listener(addr, authkey=key)
self.active = True
if self.accept:
listener.accept()
while self.active:
sleep(0.1)
listener.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.active = False
self.join()
print("Stopped listener")
return True
for description, accept, name, function in [
("ClientWithTimeout succeeds when the listener accepts connections.",
True, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
("ClientWithTimeout fails after 3s when listener doesn't accept connections.",
False, "ClientWithTimeout", lambda: ClientWithTimeout(addr, timeout=3, authkey=key)),
("Client succeeds when the listener accepts connections.",
True, "Client", lambda: Client(addr, authkey=key)),
("Client hangs when the listener doesn't accept connections (use ctrl-C to stop).",
False, "Client", lambda: Client(addr, authkey=key))]:
print("Expected result:", description)
with ListenerThread(accept):
start_time = time()
try:
print("Creating connection using %s... " % name)
client = function()
print("Client created:", client)
except Exception as e:
print("Failed:", e)
print("Time elapsed: %f seconds" % (time() - start_time))
print()
Running this on Linux produces the following output:
Expected result: ClientWithTimeout succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using ClientWithTimeout...
Client created: <multiprocessing.connection.Connection object at 0x7fad536884e0>
Time elapsed: 0.003276 seconds
Stopped listener
Expected result: ClientWithTimeout fails after 3s when listener doesn't accept connections.
Starting listener, not accepting connections
Creating connection using ClientWithTimeout...
Failed: [Errno 11] Resource temporarily unavailable
Time elapsed: 3.157268 seconds
Stopped listener
Expected result: Client succeeds when the listener accepts connections.
Starting listener, accepting connections
Creating connection using Client...
Client created: <multiprocessing.connection.Connection object at 0x7fad53688c50>
Time elapsed: 0.001957 seconds
Stopped listener
Expected result: Client hangs when the listener doesn't accept connections (use ctrl-C to stop).
Starting listener, not accepting connections
Creating connection using Client...
^C
Stopped listener