Python: How to run nested parallel process in pyth

2019-03-02 09:01发布

问题:

I have a dataset df of trader transactions. I have 2 levels of for loops as follows:

smartTrader =[]

for asset in range(len(Assets)):
    df = df[df['Assets'] == asset]
    # I have some more calculations here
    for trader in range(len(df['TraderID'])):
        # I have some calculations here, If trader is successful, I add his ID  
        # to the list as follows
        smartTrader.append(df['TraderID'][trader])

    # some more calculations here which are related to the first for loop.

I would like to parallelise the calculations for each asset in Assets, and I also want to parallelise the calculations for each trader for every asset. After ALL these calculations are done, I want to do additional analysis based on the list of smartTrader.

This is my first attempt at parallel processing, so please be patient with me, and I appreciate your help.

回答1:

If you use pathos, which provides a fork of multiprocessing, you can easily nest parallel maps. pathos is built for easily testing combinations of nested parallel maps -- which are direct translations of nested for loops. It provides a selection of maps that are blocking, non-blocking, iterative, asynchronous, serial, parallel, and distributed.

>>> from pathos.pools import ProcessPool, ThreadPool
>>> amap = ProcessPool().amap
>>> tmap = ThreadPool().map
>>> from math import sin, cos
>>> print amap(tmap, [sin,cos], [range(10),range(10)]).get()
[[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385, -0.27941549819892586, 0.6569865987187891, 0.9893582466233818, 0.4121184852417566], [1.0, 0.5403023058681398, -0.4161468365471424, -0.9899924966004454, -0.6536436208636119, 0.2836621854632263, 0.9601702866503661, 0.7539022543433046, -0.14550003380861354, -0.9111302618846769]]

Here this example uses a processing pool and a thread pool, where the thread map call is blocking, while the processing map call is asynchronous (note the get at the end of the last line).

Get pathos here: https://github.com/uqfoundation or with: $ pip install git+https://github.com/uqfoundation/pathos.git@master



回答2:

Nested parallelism can be done elegantly with Ray, a system that allows you to easily parallelize and distribute your Python code.

Assume you want to parallelize the following nested program

def inner_calculation(asset, trader):
    return trader

def outer_calculation(asset):
    return  asset, [inner_calculation(asset, trader) for trader in range(5)]

inner_results = []
outer_results = []

for asset in range(10):
    outer_result, inner_result = outer_calculation(asset)
    outer_results.append(outer_result)
    inner_results.append(inner_result)

# Then you can filter inner_results to get the final output.

Bellow is the Ray code parallelizing the above code:

  • Use the @ray.remote decorator for each function that we want to execute concurrently in its own process. A remote function returns a future (i.e., an identifier to the result) rather than the result itself.
  • When invoking a remote function f() the remote modifier, i.e., f.remote()
  • Use the ids_to_vals() helper function to convert a nested list of ids to values.

Note the program structure is identical. You only need to add remote and then convert the futures (ids) returned by the remote functions to values using the ids_to_vals() helper function.

import ray

ray.init()

# Define inner calculation as a remote function.
@ray.remote
def inner_calculation(asset, trader):
    return trader

# Define outer calculation to be executed as a remote function.
@ray.remote(num_return_vals = 2)
def outer_calculation(asset):
    return  asset, [inner_calculation.remote(asset, trader) for trader in range(5)]

# Helper to convert a nested list of object ids to a nested list of corresponding objects.
def ids_to_vals(ids):
    if isinstance(ids, ray.ObjectID):
        ids = ray.get(ids)
    if isinstance(ids, ray.ObjectID):
        return ids_to_vals(ids)
    if isinstance(ids, list):
        results = []
        for id in ids:
            results.append(ids_to_vals(id))
        return results
    return ids

outer_result_ids = []
inner_result_ids = []

for asset in range(10):
    outer_result_id, inner_result_id = outer_calculation.remote(asset)
    outer_result_ids.append(outer_result_id)
    inner_result_ids.append(inner_result_id)

outer_results = ids_to_vals(outer_result_ids)
inner_results = ids_to_vals(inner_result_ids)

There are a number of advantages of using Ray over the multiprocessing module. In particular, the same code will run on a single machine as well as on a cluster of machines. For more advantages of Ray see this related post.



回答3:

Instead of using for, use map:

import functools
smartTrader =[]

m=map( calculations_as_a_function, 
        [df[df['Assets'] == asset] \
                for asset in range(len(Assets))])
functools.reduce(smartTradder.append, m)

From then on, you can try different parallel map implementations s.a. multiprocessing's, or stackless'



回答4:

Probably threading, from standard python library, is most convenient approach:

import threading

def worker(id):
    #Do you calculations here
    return

threads = []
for asset in range(len(Assets)):
    df = df[df['Assets'] == asset]
    for trader in range(len(df['TraderID'])):
        t = threading.Thread(target=worker, args=(trader,))
        threads.append(t)
        t.start()
    #add semaphore here if you need synchronize results for all traders.