Raise TimeOutError after given time for multiproce

2019-05-10 04:10发布

问题:

I'm trying to interrupt multiprocessing.connection.Listener.accept(), but have thus far been unsuccessful. Since it doesn't provide a timeout parameter, I thought perhaps I could use socket.setdefaulttimeout() to interrupt it, as suggested in post I cannot find anymore, here on SO.

This didnt work. I then tried calling close() on the Listener() object. according to this post's answer, this should have worked.

It appears, however, that these objects to not play along with the usual socket-related solutions.

I can confirm that that the Listener is closed by the Timer object as expected, but the accept() call isn't interrupted.

The Code:

import logging
import socket
import os
from multiprocessing.connection import Listener
from queue import Queue, Empty
from threading import Thread, Event, Timer

class Node(Thread):
    """Base Class providing a AF_INET, AF_UNIX or AF_PIPE connection to its
    data queue. It offers put() and get() method wrappers, and therefore
    behaves like a Queue as well as a Thread.

    Data from the internal queue is automatically fed to any connecting client.
    """
    def __init__(self, sock_name, max_q_size=None, timeout=None,
                 *thread_args, **thread_kwargs):
        """Initialize class.

        :param sock_name: UDS, TCP socket or pipe name
        :param max_q_size: maximum queue size for self.q, default infinite
        """
        self._sock_name = sock_name
        self.connector = Listener(sock_name)
        max_q_size = max_q_size if max_q_size else 0
        self.q = Queue(maxsize=max_q_size)
        self._running = Event()
        self.connection_timer = Timer(timeout, self.connection_timed_out)
        super(Node, self).__init__(*thread_args, **thread_kwargs)

    def connection_timed_out(self):
        """Closes the Listener and shuts down Node if no Client connected.

        :return:
        """
        self.connector.close()
        self.join()

    def _start_connection_timer(self):
        self.connection_timer.start()

    def start(self):
        self._running.set()
        super(Node, self).start()

    def join(self, timeout=None):
        print("clearing..")
        self._running.clear()
        print("internal join")
        super(Node, self).join(timeout=timeout)
        print("Done")

    def run(self):
        while self._running.is_set():
            print("Accepting connections..")
            self._start_connection_timer()
            try:
                client = self.connector.accept()
                self.connection_timer.cancel()
                self.feed_data(client)
            except (TimeoutError, socket.timeout):
                continue
            except Exception as e:
                raise
        print("Run() Terminated!")

    def feed_data(self, client):
        try:
            while self._running.is_set():
                try:
                    client.send(self.q.get())
                except Empty:
                    continue
        except EOFError:
            return


if __name__ == '__main__':
    import time
    n = Node('/home/nils/git/spab2/test.uds', timeout=10)
    n.start()
    print("Sleeping")
    time.sleep(15)
    print("Manual join")
    n.join()

I realize my question is a duplicate of this question - however, it is almost one year old and has not even received a comment. In addition, I'm using Unix Domain Sockets, as opposed to the linked post's TCP connection.

回答1:

I managed to set the timeout in the following way in Python 2.7:

self.listener = mpc.Listener((address, port))
self.listener._listener._socket.settimeout(3)

With this, the accept() call is interrupted.

Result:

conn = self.listener.accept()
File "/usr/lib/python2.7/multiprocessing/connection.py", line 145, in accept
c = self._listener.accept()
File "/usr/lib/python2.7/multiprocessing/connection.py", line 275, in accept
s, self._last_accepted = self._socket.accept()
File "/usr/lib/python2.7/socket.py", line 202, in accept
sock, addr = self._sock.accept()
timeout: timed out

Regards, Henri