I am writing an application to collect UDP messages and process them every 1sec.
Application prototype looks like:
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time
class UdpListener(DatagramProtocol):
messages = []
def datagramReceived(self, data, (host, port)):
self.messages.append(data)
class Messenger(threading.Thread):
listener = None
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.messages
length = len(recivedMessages)
messagesToProccess = recivedMessages[0:length]
#doSomethingWithMessages(messagesToProccess)
del self.listener.messages[0:length]
print(length)
listener = UdpListener()
messenger = Messenger()
messenger.listener = listener
messenger.start()
reactor.listenUDP(5556, listener)
reactor.run()
I am not sure if I can easily remove beginning values from list (del self.listener.messages[0:length]) without any risk that incoming message changes list and application crashes.
Update - version with lock
class Messenger(threading.Thread):
listener = None
lock = threading.Lock()
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.messages
self.lock.acquire()
try:
length = len(recivedMessages)
messagesToProccess = recivedMessages[0:length]
del self.listener.messages[0:length]
except Exception as e:
raise e
finally:
self.lock.release()
#doSomethingWithMessages(messagesToProccess)
print(length)
Why not implement this with a DeferredQueue, which is exactly meant for this purpose. If you wish to use threads you need to take some extra care.
Here is an example with a DeferredQueue, allowing for threading:
It is very important to note that in case you choose to defer your message processing to Twisted thread pool (using
threads.deferToThread
) your code will be running in a different thread. You'll be likely responding to messages from a different thread and, in Twisted, protocols are not thread-safe objects (http://twistedmatrix.com/documents/13.2.0/core/howto/threading.html#auto0).For this case you'd use
reactor.callFromThread()
to protect the critical resourcetransport
, as in this example:Other changes done:
messages
variable as_messages
, as it should be considered entirely private._messages
initialization inside the__init__()
method and assigned toself._messages
, otherwise the messages list would be shared among all instances! I guess you had only one instance of the class but... (Variables inside and outside of a class __init__() function)Your code isn't thread-safe, no. You'd need to have a lock around
messages
.However, you don't need a thread here. Why not do this?
UPDATE: Here is how the original version would work with a lock, for educational purposes only. Note that this is not as efficient and also more prone to bugs. EDIT: Separated all the message logic out into
UdpListener
so the classes using it don't need to know its gooey internal details.