Non-blocking multiprocessing.connection.Listener?

2020-05-31 05:45发布

问题:

I use multiprocessing.connection.Listener for communication between processes, and it works as a charm for me. Now i would really love my mainloop to do something else between commands from client. Unfortunately listener.accept() blocks execution until connection from client process is established.

Is there a simple way of managing non blocking check for multiprocessing.connection? Timeout? Or shall i use a dedicated thread?

    # Simplified code:

    from multiprocessing.connection import Listener

    def mainloop():
        listener = Listener(address=(localhost, 6000), authkey=b'secret')

        while True:
            conn = listener.accept() # <---  This blocks!
            msg = conn.recv() 
            print ('got message: %r' % msg)
            conn.close()

回答1:

I've not used the Listener object myself- for this task I normally use multiprocessing.Queue; doco at the following link:

https://docs.python.org/2/library/queue.html#Queue.Queue

That object can be used to send and receive any pickle-able object between Python processes with a nice API; I think you'll be most interested in:

  • in process A
    • .put('some message')
  • in process B
    • .get_nowait() # will raise Queue.Empty if nothing is available- handle that to move on with your execution

The only limitation with this is you'll need to have control of both Process objects at some point in order to be able to allocate the queue to them- something like this:

import time
from Queue import Empty
from multiprocessing import Queue, Process


def receiver(q):
    while 1:
        try:
            message = q.get_nowait()
            print 'receiver got', message
        except Empty:
            print 'nothing to receive, sleeping'
            time.sleep(1)


def sender(q):
    while 1:
        message = 'some message'
        q.put('some message')
        print 'sender sent', message
        time.sleep(1)


some_queue = Queue()

process_a = Process(
    target=receiver,
    args=(some_queue,)
)

process_b = Process(
    target=sender,
    args=(some_queue,)
)

process_a.start()
process_b.start()

print 'ctrl + c to exit'
try:
    while 1:
        time.sleep(1)
except KeyboardInterrupt:
    pass

process_a.terminate()
process_b.terminate()

process_a.join()
process_b.join()

Queues are nice because you can actually have as many consumers and as many producers for that exact same Queue object as you like (handy for distributing tasks).

I should point out that just calling .terminate() on a Process is bad form- you should use your shiny new messaging system to pass a shutdown message or something of that nature.



回答2:

The multiprocessing module comes with a nice feature called Pipe(). It is a nice way to share resources between two processes(never tried more than two before). With the dawn of python 3.80 came the shared memory function in the multiprocessing module but i have not really tested that so i cannot vouch for it You will use the pipe function something like

from multiprocessing import Pipe

.....

def sending(conn):
    message = 'some message'
    #perform some code
    conn.send(message)
    conn.close()

receiver, sender = Pipe()
p = Process(target=sending, args=(sender,))
p.start()
print receiver.recv()   # prints "some message"
p.join()

with this you should be able to have separate processes running independently and when you get to the point which you need the input from one process. If there is somehow an error due to the unrelieved data of the other process you can put it on a kind of sleep or halt or use a while loop to constantly check pending when the other process finishes with that task and sends it over

while not parent_conn.recv():
    time.sleep(5)

this should keep it in an infinite loop until the other process is done running and sends the result. This is also about 2-3 times faster than Queue. Although queue is also a good option personally I do not use it.