Cannot subclass multiprocessing Queue in Python 3.

2019-07-16 03:08发布

问题:

My eventual goal is to redirect the stdout from several subprocesses to some queues, and print those out somewhere (maybe in a little GUI).

The first step is to subclass Queue into an object that behaves much like the stdout. But that is where I got stuck. Subclassing the multiprocessing Queue seems impossible in Python v3.5.

# This is a Queue that behaves like stdout
# Unfortunately, doesn't work in Python 3.5   :-(
class StdoutQueue(Queue):
    def __init__(self,*args,**kwargs):
        Queue.__init__(self,*args,**kwargs, ctx='')

    def write(self,msg):
        self.put(msg)

    def flush(self):
        sys.__stdout__.flush()

I found this snippet in the following post (probably Python 3.5 did not yet exist at that moment): Python multiprocessing redirect stdout of a child process to a Tkinter Text

In Python v3.5 you stumble on strange error messages when subclassing the multiprocessing Queue class. I found two bug reports describing the issue:

https://bugs.python.org/issue21367

https://bugs.python.org/issue19895

I have 2 questions:

  1. Suppose I want to stick to Python v3.5 - going to a previous version is not really an option. What workaround can I use to subclass the multiprocessing Queue somehow?
  2. Is the bug still around if I upgrade to Python v3.6?

EDIT :

There is a known issue when you try to subclass the Queue class found in here:

from multiprocessing import Queue    # <- known issue: you cannot subclass
                                     #    this Queue class, because it is
                                     #    not a genuine python class.

But the following should work:

from multiprocessing.queues import Queue   # <- from this Queue class, you
                                           #    should be able to make a
                                           #    subclass. But Python 3.5
                                           #    refuses :-(

Sadly, even that doesn't work in Python v3.5. You get the following error:

    C:\Users\..\myFolder > python myTest.py

        Traceback (most recent call last):
            File "myTest.py", line 49, in <module>
              q = StdoutQueue()
            File "myTest.py", line 22, in __init__
              super(StdoutQueue,self).__init__(*args,**kwargs)
        TypeError: __init__() missing 1 required keyword-only argument: 'ctx'

EDIT :

Thank you Darth Kotik for solving the problem! Here is the complete code, updated with his solution. Now it works.

import sys
import time
import multiprocessing as mp
import multiprocessing.queues as mpq
from threading import Thread
from tkinter import *

'''-------------------------------------------------------------------'''
'''                SUBCLASSING THE MULTIPROCESSING QUEUE              '''
'''                                                                   '''
'''         ..and make it behave as a general stdout io               '''
'''-------------------------------------------------------------------'''
# The StdoutQueue is a Queue that behaves like stdout.
# We will subclass the Queue class from the multiprocessing package
# and give it the typical stdout functions.
#
# (1) First issue
# Subclassing multiprocessing.Queue or multiprocessing.SimpleQueue
# will not work, because these classes are not genuine
# python classes.
# Therefore, you need to subclass multiprocessing.queues.Queue or
# multiprocessing.queues.SimpleQueue . This issue is known, and is not
# the reason for asking this question. But I mention it here, for
# completeness.
#
# (2) Second issue
# There is another problem that arises only in Python V5 (and beyond).
# When subclassing multiprocessing.queues.Queue, you have to provide
# a 'multiprocessing context'. Not doing that, leads to an obscure error
# message, which is in fact the main topic of this question. Darth Kotik
# solved it.
# His solution is visible in this code:
class StdoutQueue(mpq.Queue):

    def __init__(self,*args,**kwargs):
        ctx = mp.get_context()
        super(StdoutQueue, self).__init__(*args, **kwargs, ctx=ctx)

    def write(self,msg):
        self.put(msg)

    def flush(self):
        sys.__stdout__.flush()


'''-------------------------------------------------------------------'''
'''                           TEST SETUP                              '''
'''-------------------------------------------------------------------'''

# This function takes the text widget and a queue as inputs.
# It functions by waiting on new data entering the queue, when it
# finds new data it will insert it into the text widget.
def text_catcher(text_widget,queue):
    while True:
        text_widget.insert(END, queue.get())

def test_child(q):
    # This line only redirects stdout inside the current process
    sys.stdout = q
    # or sys.stdout = sys.__stdout__ if you want to print the child to the terminal
    print('child running')

def test_parent(q):
    # Again this only redirects inside the current (main) process
    # commenting this like out will cause only the child to write to the widget
    sys.stdout = q
    print('parent running')
    time.sleep(0.5)
    mp.Process(target=test_child,args=(q,)).start()

if __name__ == '__main__':
    gui_root = Tk()
    gui_txt = Text(gui_root)
    gui_txt.pack()
    q = StdoutQueue()
    gui_btn = Button(gui_root, text='Test', command=lambda:test_parent(q),)
    gui_btn.pack()

    # Instantiate and start the text monitor
    monitor = Thread(target=text_catcher,args=(gui_txt,q))
    monitor.daemon = True
    monitor.start()

    gui_root.mainloop()

回答1:

>>> import multiprocessing
>>> type(multiprocessing.Queue)
<class 'method'>
AttributeError: module 'multiprocessing' has no attribute 'queues'
>>> import multiprocessing.queues
>>> type(multiprocessing.queues.Queue)
<class 'type'>

So as you can see multiprocessing.Queue is just constructor method for multiprocessing.queues.Queue class. If you want to make a child class just do class MyQueue(multiprocessing.queues.Queue)

You can see source of this method here

EDIT: Okay. I got your problem now. As you can see on a link above, multiprocessing.Queue passes ctx argument to Queue. So I managed to get it working by doing it myself in __init__ method. I don't completely inderstand where BaseContext object supposed to get _name attribute, so I passed it manually.

def __init__(self,*args,**kwargs):
    from multiprocessing.context import BaseContext
    ctx = BaseContext()
    ctx._name = "Name"
    super(StdoutQueue,self).__init__(*args,**kwargs, ctx=ctx)

EDIT2: Turned out docs have some information about context here. So instead of manually creating it like I did you can do

import multiprocessing
ctx = multiprocessing.get_context()

It will create proper context with _name set (to 'fork' in your particular case) and you can pass it to your queue.