I'm trying to interrupt multiprocessing.connection.Listener.accept()
, but have thus far been unsuccessful. Since it doesn't provide a timeout
parameter, I thought perhaps I could use socket.setdefaulttimeout()
to interrupt it, as suggested in post I cannot find anymore, here on SO.
This didnt work. I then tried calling close()
on the Listener()
object. according to this post's answer, this should have worked.
It appears, however, that these objects to not play along with the usual socket
-related solutions.
I can confirm that that the Listener
is closed by the Timer
object as expected, but the accept()
call isn't interrupted.
The Code:
import logging
import socket
import os
from multiprocessing.connection import Listener
from queue import Queue, Empty
from threading import Thread, Event, Timer
class Node(Thread):
"""Base Class providing a AF_INET, AF_UNIX or AF_PIPE connection to its
data queue. It offers put() and get() method wrappers, and therefore
behaves like a Queue as well as a Thread.
Data from the internal queue is automatically fed to any connecting client.
"""
def __init__(self, sock_name, max_q_size=None, timeout=None,
*thread_args, **thread_kwargs):
"""Initialize class.
:param sock_name: UDS, TCP socket or pipe name
:param max_q_size: maximum queue size for self.q, default infinite
"""
self._sock_name = sock_name
self.connector = Listener(sock_name)
max_q_size = max_q_size if max_q_size else 0
self.q = Queue(maxsize=max_q_size)
self._running = Event()
self.connection_timer = Timer(timeout, self.connection_timed_out)
super(Node, self).__init__(*thread_args, **thread_kwargs)
def connection_timed_out(self):
"""Closes the Listener and shuts down Node if no Client connected.
:return:
"""
self.connector.close()
self.join()
def _start_connection_timer(self):
self.connection_timer.start()
def start(self):
self._running.set()
super(Node, self).start()
def join(self, timeout=None):
print("clearing..")
self._running.clear()
print("internal join")
super(Node, self).join(timeout=timeout)
print("Done")
def run(self):
while self._running.is_set():
print("Accepting connections..")
self._start_connection_timer()
try:
client = self.connector.accept()
self.connection_timer.cancel()
self.feed_data(client)
except (TimeoutError, socket.timeout):
continue
except Exception as e:
raise
print("Run() Terminated!")
def feed_data(self, client):
try:
while self._running.is_set():
try:
client.send(self.q.get())
except Empty:
continue
except EOFError:
return
if __name__ == '__main__':
import time
n = Node('/home/nils/git/spab2/test.uds', timeout=10)
n.start()
print("Sleeping")
time.sleep(15)
print("Manual join")
n.join()
I realize my question is a duplicate of this question - however, it is almost one year old and has not even received a comment. In addition, I'm using Unix Domain Socket
s, as opposed to the linked post's TCP
connection.