How to return values from Process- or Thread insta

2020-02-07 07:33发布

So I want to run a function which can either search for information on the web or directly from my own mysql database. The first process will be time-consuming, the second relatively fast.

With this in mind I create a process which starts this compound search (find_compound_view). If the process finishes relatively fast it means it's present on the database so I can render the results immediately. Otherwise, I will render "drax_retrieving_data.html".

The stupid solution I came up with was to run the function twice, once to check if the process takes a long time, the other to actually get the return values of the function. This is pretty much because I don't know how to return the values of my find_compound_view function. I've tried googling but I can't seem to find how to return the values from the class Process specifically.

   p = Process(target=find_compound_view, args=(form,))
        p.start()
        is_running = p.is_alive()
        start_time=time.time()
        while is_running:
            time.sleep(0.05)
            is_running = p.is_alive()
            if time.time() - start_time > 10 :
                print('Timer exceeded, DRAX is retrieving info!',time.time() - start_time)
                return render(request,'drax_internal_dbs/drax_retrieving_data.html')
        compound = find_compound_view(form,use_email=False)

   if compound:
      data=*****
      return  render(request, 'drax_internal_dbs/result.html',data)

1条回答
劫难
2楼-- · 2020-02-07 07:54

You will need a multiprocessing.Pipe or a multiprocessing.Queue to send the results back to your parent-process. If you just do I/0, you should use a Thread instead of a Process, since it's more lightweight and most time will be spend on waiting. I'm showing you how it's done for Process and Threads in general.


Process with Queue

The multiprocessing queue is build on top of a pipe and access is synchronized with locks/semaphores. Queues are thread- and process-safe, meaning you can use one queue for multiple producer/consumer-processes and even multiple threads in these processes. Adding the first item on the queue will also start a feeder-thread in the calling process. The additional overhead of a multiprocessing.Queue makes using a pipe for single-producer/single-consumer scenarios preferable and more performant.

Here's how to send and retrieve a result with a multiprocessing.Queue:

from multiprocessing import Process, Queue

SENTINEL = 'SENTINEL'

def sim_busy(out_queue, x):
    for _ in range(int(x)):
        assert 1 == 1
    result = x
    out_queue.put(result)
    # If all results are enqueued, send a sentinel-value to let the parent know
    # no more results will come.
    out_queue.put(SENTINEL)


if __name__ == '__main__':

    out_queue = Queue()

    p = Process(target=sim_busy, args=(out_queue, 150e6))  # 150e6 == 150000000.0
    p.start()

    for result in iter(out_queue.get, SENTINEL):  # sentinel breaks the loop
        print(result)

The queue is passed as argument into the function, results are .put() on the queue and the parent get.()s from the queue. .get() is a blocking call, execution does not resume until something is to get (specifying timeout parameter is possible). Note the work sim_busy does here is cpu-intensive, that's when you would choose processes over threads.


Process & Pipe

For one-to-one connections a pipe is enough. The setup is nearly identical, just the methods are named differently and a call to Pipe() returns two connection objects. In duplex mode, both objects are read-write ends, with duplex=False (simplex) the first connection object is the read-end of the pipe, the second is the write-end. In this basic scenario we just need a simplex-pipe:

from multiprocessing import Process, Pipe

SENTINEL = 'SENTINEL'


def sim_busy(write_conn, x):
    for _ in range(int(x)):
        assert 1 == 1
    result = x
    write_conn.send(result)
    # If all results are send, send a sentinel-value to let the parent know
    # no more results will come.
    write_conn.send(SENTINEL)


if __name__ == '__main__':

    # duplex=False because we just need one-way communication in this case.
    read_conn, write_conn = Pipe(duplex=False)

    p = Process(target=sim_busy, args=(write_conn, 150e6))  # 150e6 == 150000000.0
    p.start()

    for result in iter(read_conn.recv, SENTINEL):  # sentinel breaks the loop
        print(result)

Thread & Queue

For use with threading, you want to switch to queue.Queue. queue.Queue is build on top of a collections.deque, adding some locks to make it thread-safe. Unlike with multiprocessing's queue and pipe, objects put on a queue.Queue won't get pickled. Since threads share the same memory address-space, serialization for memory-copying is unnecessary, only pointers are transmitted.

from threading import Thread
from queue import Queue
import time

SENTINEL = 'SENTINEL'


def sim_io(out_queue, query):
    time.sleep(1)
    result = query + '_result'
    out_queue.put(result)
    # If all results are enqueued, send a sentinel-value to let the parent know
    # no more results will come.
    out_queue.put(SENTINEL)


if __name__ == '__main__':

    out_queue = Queue()

    p = Thread(target=sim_io, args=(out_queue, 'my_query'))
    p.start()

    for result in iter(out_queue.get, SENTINEL):  # sentinel-value breaks the loop
        print(result)

  • Read here why for result in iter(out_queue.get, SENTINEL): should be prefered over a while True...break setup, where possible.
  • Read here why you should use if __name__ == '__main__': in all your scripts and especially in multiprocessing.
  • More about get()-usage here.
查看更多
登录 后发表回答