I am writing an application to collect UDP messages and process them every 1sec.
Application prototype looks like:
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time
class UdpListener(DatagramProtocol):
messages = []
def datagramReceived(self, data, (host, port)):
self.messages.append(data)
class Messenger(threading.Thread):
listener = None
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.messages
length = len(recivedMessages)
messagesToProccess = recivedMessages[0:length]
#doSomethingWithMessages(messagesToProccess)
del self.listener.messages[0:length]
print(length)
listener = UdpListener()
messenger = Messenger()
messenger.listener = listener
messenger.start()
reactor.listenUDP(5556, listener)
reactor.run()
I am not sure if I can easily remove beginning values from list (del self.listener.messages[0:length]) without any risk that incoming message changes list and application crashes.
Update - version with lock
class Messenger(threading.Thread):
listener = None
lock = threading.Lock()
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.messages
self.lock.acquire()
try:
length = len(recivedMessages)
messagesToProccess = recivedMessages[0:length]
del self.listener.messages[0:length]
except Exception as e:
raise e
finally:
self.lock.release()
#doSomethingWithMessages(messagesToProccess)
print(length)
Your code isn't thread-safe, no. You'd need to have a lock around messages
.
However, you don't need a thread here. Why not do this?
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
class UdpListener(DatagramProtocol):
callingLater = False
messages = []
def process(self):
doSomethingWithMessages(self.messages)
self.messages = []
self.callingLater = False
def datagramReceived(self, data, (host, port)):
self.messages.append(data)
if not self.callingLater:
reactor.callLater(1.0, self.process)
self.callingLater = True
listener = UdpListener()
reactor.listenUDP(5556, listener)
reactor.run()
UPDATE: Here is how the original version would work with a lock, for educational purposes only. Note that this is not as efficient and also more prone to bugs. EDIT: Separated all the message logic out into UdpListener
so the classes using it don't need to know its gooey internal details.
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time
class UdpListener(DatagramProtocol):
message_lock = threading.Lock()
messages = []
def datagramReceived(self, data, (host, port)):
with self.message_lock:
self.messages.append(data)
def getAndClearMessages(self):
with self.message_lock:
res = self.messages
self.messages = []
return res
class Messenger(threading.Thread):
listener = None
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.getAndClearMessages()
length = len(recivedMessages)
#doSomethingWithMessages(recivedMessages)
print(length)
listener = UdpListener()
messenger = Messenger()
messenger.listener = listener
messenger.start()
reactor.listenUDP(5556, listener)
reactor.run()
Why not implement this with a DeferredQueue, which is exactly meant for this purpose. If you wish to use threads you need to take some extra care.
Here is an example with a DeferredQueue, allowing for threading:
class UdpListener(DatagramProtocol):
def __init__(self)
self._messages = DeferredQueue()
def datagramReceived(self, data, (host, port)):
self._messages.put(message)
@inlineCallbacks
def _wait_for_and_process_next_message(self):
# Get message from queue through a deferred call from the DeferredQueue
# Here we use @inlineCallbacks, so we assign the result from yield
# which is the new message, and will "block" (actually releasing control to Twisted) until a message gets in
message = yield self._message_queue.get()
# Do something with your message here, and ensure you catch any exceptions!
# If your message processing may be long, you may wish to run it in another thread,
# and because of @inlineCallbacks, this call will "block" until your function finishes.
# In case you did this, ensure you read the notes below.
yield threads.deferToThread(my_long_function, message)
# Schedule an immediate call to this method again in order to process next message
self.wait_for_and_process_next_message()
def wait_for_and_process_next_message(self):
reactor.callLater(0, self._wait_for_and_process_next_message)
def initialize(self):
# Call this during your application bootstrapping, so you start processing messages
self.wait_for_and_process_next_message()
It is very important to note that in case you choose to defer your message processing to Twisted thread pool (using threads.deferToThread
) your code will be running in a different thread. You'll be likely responding to messages from a different thread and, in Twisted, protocols are not thread-safe objects (http://twistedmatrix.com/documents/13.2.0/core/howto/threading.html#auto0).
For this case you'd use reactor.callFromThread()
to protect the critical resource transport
, as in this example:
def _send_message_critical_section(self, message):
self.transport.write(message, (self.host, self.port))
def send_message(self, message):
reactor.callFromThread(self._send_message_critical_section, message)
Other changes done:
- Renamed the
messages
variable as _messages
, as it should be considered entirely private.
- Moved the
_messages
initialization inside the __init__()
method and assigned to self._messages
, otherwise the messages list would be shared among all instances! I guess you had only one instance of the class but... (Variables inside and outside of a class __init__() function)