Process vs. Thread with regards to using Queue()/d

2019-02-15 11:40发布

问题:

I would like to create either a Thread or a Process which runs forever in a While True loop.

I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.

I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..

import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque

class ThreadingTest(Thread):

    def __init__(self, q):
        super(ThreadingTest, self).__init__()
        self.q = q
        self.toRun = False

    def run(self):
        print("Started Thread")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Thread deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Thread Queue: " + str(i))

    def stop(self):
        print("Trying to stop Thread")
        self.toRun = False
        while self.isAlive():
            time.sleep(0.1)
        print("Stopped Thread")

class ProcessTest(Process):

    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.toRun = False
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Process deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Process Queue: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.toRun = False
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    q = Queue()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        if type(q) == type(deque()):
            q.append(i)
        elif type(q) == type(Queue()):
            q.put_nowait(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    if type(q) == type(deque()):
        print(q)
    elif type(q) == type(Queue()):
        while q.qsize() > 0:
            print(str(q.get_nowait()))

As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.

ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.

Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])

ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().

Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process

I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.

回答1:

You can't use a collections.deque to pass data between two multiprocessing.Process instances, because collections.deque is not process-aware. multiprocessing.Queue writes its contents to a multiprocessing.Pipe internally, which means that data in it can be enqueued in once process and retrieved in another. collections.deque doesn't have that kind of plumbing, so it won't work. When you write to the deque in one process, the deque instance in the other process won't be affected at all; they're completely separate instances.

A similar issue is happening to your stop() method. You're changing the value of toRun in the main process, but this won't affect the child at all. They're completely separate instances. The best way to end the child would be to send some sentinel to the Queue. When you get the sentinel in the child, break out of the infinite loop:

def run(self):
    print("Started Process")
    self.toRun = True
    while self.toRun:
        if type(self.q) == type(deque()):
            if self.q:
                i = self.q.popleft()
                print("Process deque: " + str(i))
        elif type(self.q) == type(Queue()):
            if not self.q.empty():
                i = self.q.get_nowait()
                if i is None:  
                    break  # Got sentinel, so break
                print("Process Queue: " + str(i))

def stop(self):
    print("Trying to stop Process")
    self.q.put(None)  # Send sentinel
    while self.is_alive():
        time.sleep(0.1)
    print("Stopped Process")

Edit:

If you actually do need deque semantics between two process, you can use a custom multiprocessing.Manager() to create a shared deque in a Manager process, and each of your Process instances will get a Proxy to it:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque

SyncManager.register('deque', deque)

def Manager():
    m = SyncManager()
    m.start()
    return m

class ProcessTest(Process):
    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if self.q._getvalue():
                i = self.q.popleft()
                if i is None:
                    break
                print("Process deque: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.q.append(None)
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    m = Manager()
    q = m.deque()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        q.append(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    print(q)

Note that this probably isn't going to be faster than a multiprocessing.Queue, though, since there's an IPC cost for every time you access the deque. It's also a much less natural data structure for passing messages the way you are.