-->

Python Multiprocessing Loop

2019-06-08 13:08发布

问题:

I'm hoping to use multiprocessing to speed up a sluggish loop. However, from what I've seen of multiprocessing examples, I'm not sure if this sort of implementation is good practice, feasible or possible.

There are broadly two parts to the loop: data ingestion and data processing. I would like to have the next part of data ingestion starting while processing is going on, so the data is available as soon as possible.

Pseudo code:

d = get_data(n)
for n in range(N):
    p = process_data(d)
    d = get_data(n+1) #prepare data for next process loop
  1. Does multiprocessing lend itself to this sort of function?
  2. How would one do this?

Thanks in advance.

回答1:

As you said, multiprocessing is basically dispatching and collecting work. And as you clarified, you basically want process_data and get_data to work in parallel.

Here's my solution for you

import multiprocessing as mp

# create pool for dispatching work
pool = mp.Pool()

# call your functions asynchronously
process_data_process = pool.apply_async(process_data, (d,))
get_data_process = pool.apply_async(get_data, (n+1,))

# After your functions are dispatched, wait for results
process_data_result = process_data_process.get()
get_data_result = get_data_process.get()

# Note: get_data_result will not be fetched till process_data_result is ready
# But that should be fine since you can't start the next batch
# till this batch is done

And you can just wrap this in your loop. Hope that answers your question!



回答2:

Let's assume that you want to have a single thread/process ingesting the data because it will be I/O rather than CPU bound. You're doing only minimal parsing and/or validation of the data before passing it to your processing layer.

Let's further assume that you can do your data processing on each input item completely in parallel; that there's no sorting nor time/sequencing dependencies among these input items.

In that case your task is basically a poster child for a "fan out" processing model. You create a multiprocessing.Queue object. Then you create a multiprocessing.Pool. This initialization code then becomes the ingestion processing task (the "producer" for the Queue) and the pool of processes all become consumers, performing processing.

There are numerous examples of this online and the first link probably has several using this pattern.

The remaining question, of course, is how you're going to handle the results.

If they need to serialized back out to some single file then the obvious approach would be to create two Queue objects ... one for the work queue (ingestion process feeds it, pool processes consume from it) and the other is the output Queue (pools feed into it, and one process then consumes from it to write the results coherently to your output). Note that it is possible, and sometimes quite efficient, to have your master (ingesting) process multiplex. It can interlace input data reads with polls on the output Queue to write results out. But, of course, you can also just spin up another process devoted to the output handling.

On the other hand it's possible that your results can be written in parallel, perhaps by the worker processes. This is fine if you're writing the results to many files, or posting them as INSERT or UPDATE statements to some SQL database, or feeding them out to Hadoop HDFS or into Spark DataSet. There are many forms of output which are amenable to parallel writes.

It's also possible that you'll want to decouple your processing and output/results handling layers. It might be that you're application will be tuned optimally with a larger number of processes in the data processing layer and a smaller number in the output layer. (If the processing on each item is CPU intensive and you have many cores, for example, then you could have a issues with too many processes choking up your I/O channels while the CPUs sit idle).

Again, use Queues. They are designed to support multi-producer and multi-consumer coherence. You're freed from a minefield of concerns about concurrency locking, deadlock and livelock issues and so on.