Is this python code thread safe (thread with twist

2019-07-18 18:44发布

I am writing an application to collect UDP messages and process them every 1sec.

Application prototype looks like:

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time

class UdpListener(DatagramProtocol):

    messages = []

    def datagramReceived(self, data, (host, port)):
        self.messages.append(data)

class Messenger(threading.Thread):

    listener = None

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        while True:
            time.sleep(1)
            recivedMessages = self.listener.messages
            length = len(recivedMessages)
            messagesToProccess = recivedMessages[0:length]
            #doSomethingWithMessages(messagesToProccess)
            del self.listener.messages[0:length]
            print(length)

listener = UdpListener()

messenger = Messenger()
messenger.listener = listener
messenger.start()

reactor.listenUDP(5556, listener)
reactor.run()

I am not sure if I can easily remove beginning values from list (del self.listener.messages[0:length]) without any risk that incoming message changes list and application crashes.

Update - version with lock

class Messenger(threading.Thread):

listener = None
lock = threading.Lock()

def __init__(self):
    threading.Thread.__init__(self)

def run(self):
    while True:
        time.sleep(1)
        recivedMessages = self.listener.messages
        self.lock.acquire()
        try:
            length = len(recivedMessages)
            messagesToProccess = recivedMessages[0:length]
            del self.listener.messages[0:length]
        except Exception as e:
            raise e
        finally:
            self.lock.release()

        #doSomethingWithMessages(messagesToProccess)
        print(length)

2条回答
地球回转人心会变
2楼-- · 2019-07-18 19:30

Why not implement this with a DeferredQueue, which is exactly meant for this purpose. If you wish to use threads you need to take some extra care.

Here is an example with a DeferredQueue, allowing for threading:

class UdpListener(DatagramProtocol):

    def __init__(self)
        self._messages = DeferredQueue()

    def datagramReceived(self, data, (host, port)):
        self._messages.put(message)

    @inlineCallbacks
    def _wait_for_and_process_next_message(self):

        # Get message from queue through a deferred call from the DeferredQueue
        # Here we use @inlineCallbacks, so we assign the result from yield 
        # which is the new message, and will "block" (actually releasing control to Twisted) until a message gets in
        message = yield self._message_queue.get()

        # Do something with your message here, and ensure you catch any exceptions!
        # If your message processing may be long, you may wish to run it in another thread,
        # and because of @inlineCallbacks, this call will "block" until your function finishes.
        # In case you did this, ensure you read the notes below.
        yield threads.deferToThread(my_long_function, message)

        # Schedule an immediate call to this method again in order to process next message
        self.wait_for_and_process_next_message()

    def wait_for_and_process_next_message(self):
        reactor.callLater(0, self._wait_for_and_process_next_message)

    def initialize(self):
        # Call this during your application bootstrapping, so you start processing messages
        self.wait_for_and_process_next_message()

It is very important to note that in case you choose to defer your message processing to Twisted thread pool (using threads.deferToThread) your code will be running in a different thread. You'll be likely responding to messages from a different thread and, in Twisted, protocols are not thread-safe objects (http://twistedmatrix.com/documents/13.2.0/core/howto/threading.html#auto0).

For this case you'd use reactor.callFromThread() to protect the critical resource transport, as in this example:

def _send_message_critical_section(self, message):
    self.transport.write(message, (self.host, self.port))

def send_message(self, message):
    reactor.callFromThread(self._send_message_critical_section, message)

Other changes done:

  • Renamed the messages variable as _messages, as it should be considered entirely private.
  • Moved the _messages initialization inside the __init__() method and assigned to self._messages, otherwise the messages list would be shared among all instances! I guess you had only one instance of the class but... (Variables inside and outside of a class __init__() function)
查看更多
萌系小妹纸
3楼-- · 2019-07-18 19:42

Your code isn't thread-safe, no. You'd need to have a lock around messages.

However, you don't need a thread here. Why not do this?

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor

class UdpListener(DatagramProtocol):
    callingLater = False

    messages = []

    def process(self):
        doSomethingWithMessages(self.messages)
        self.messages = []
        self.callingLater = False

    def datagramReceived(self, data, (host, port)):
        self.messages.append(data)
        if not self.callingLater:
            reactor.callLater(1.0, self.process)
            self.callingLater = True

listener = UdpListener()

reactor.listenUDP(5556, listener)
reactor.run()

UPDATE: Here is how the original version would work with a lock, for educational purposes only. Note that this is not as efficient and also more prone to bugs. EDIT: Separated all the message logic out into UdpListener so the classes using it don't need to know its gooey internal details.

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time

class UdpListener(DatagramProtocol):
    message_lock = threading.Lock()
    messages = []

    def datagramReceived(self, data, (host, port)):
        with self.message_lock:
            self.messages.append(data)

    def getAndClearMessages(self):
        with self.message_lock:
            res = self.messages
            self.messages = []
        return res

class Messenger(threading.Thread):

    listener = None

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        while True:
            time.sleep(1)
            recivedMessages = self.listener.getAndClearMessages()
            length = len(recivedMessages)
            #doSomethingWithMessages(recivedMessages)
            print(length)

listener = UdpListener()

messenger = Messenger()
messenger.listener = listener
messenger.start()

reactor.listenUDP(5556, listener)
reactor.run()
查看更多
登录 后发表回答