I would like to create either a Thread or a Process which runs forever in a While True loop.
I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.
I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..
import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque
class ThreadingTest(Thread):
def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False
def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))
def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()
if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))
As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.
ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.
Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])
ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().
Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process
I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.
You can't use a
collections.deque
to pass data between twomultiprocessing.Process
instances, becausecollections.deque
is not process-aware.multiprocessing.Queue
writes its contents to amultiprocessing.Pipe
internally, which means that data in it can be enqueued in once process and retrieved in another.collections.deque
doesn't have that kind of plumbing, so it won't work. When you write to thedeque
in one process, thedeque
instance in the other process won't be affected at all; they're completely separate instances.A similar issue is happening to your
stop()
method. You're changing the value oftoRun
in the main process, but this won't affect the child at all. They're completely separate instances. The best way to end the child would be to send some sentinel to theQueue
. When you get the sentinel in the child, break out of the infinite loop:Edit:
If you actually do need
deque
semantics between two process, you can use a custommultiprocessing.Manager()
to create a shareddeque
in aManager
process, and each of yourProcess
instances will get aProxy
to it:Note that this probably isn't going to be faster than a
multiprocessing.Queue
, though, since there's an IPC cost for every time you access thedeque
. It's also a much less natural data structure for passing messages the way you are.