How to best share static data between ipyparallel

2019-04-18 14:34发布

问题:

I am running the same simulation in a loop with different parameters. Each simulation makes use a pandas DataFrame (data) which is only read, never modified. Using ipyparallel (IPython parallel), I can put this DataFrames into the global variable space of each engine in my view before simulations start:

view['data'] = data

The engines then have access to the DataFrame for all the simulations which get run on them. The process of copying the data (if pickled, data is 40MB) is only a few seconds. However, It appears that if the number of simulations grows, memory usage grows very large. I imagine this shared data is getting copied for each task rather than just for each engine. What's the best practice for sharing static read-only data from a client with engines? Copying it once per engine is acceptable, but ideally it would only have to be copied once per host (I have 4 engines on host1 and 8 engines on host2).

Here's my code:

from ipyparallel import Client
import pandas as pd

rc = Client()
view = rc[:]  # use all engines
view.scatter('id', rc.ids, flatten=True)  # So we can track which engine performed what task

def do_simulation(tweaks):
    """ Run simulation with specified tweaks """
    #  Do sim stuff using the global data DataFrame
    return results, id, tweaks

if __name__ == '__main__':
    data = pd.read_sql("SELECT * FROM my_table", engine)
    threads = []  # store list of tweaks dicts
    for i in range(4):
        for j in range(5):
            for k in range(6):
                threads.append(dict(i=i, j=j, k=k)

    # Set up globals for each engine.  This is the read-only DataFrame
    view['data'] = data
    ar = view.map_async(do_simulation, threads)

    # Our async results should pop up over time.  Let's measure our progress:
    for idx, (results, id, tweaks) in enumerate(ar):
        print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress / len(ar), idx, id)
        # Store results as a pickle for the future
        pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j'])
        # Save our results to a pickle file
        pd.to_pickle(results, out_file_path + pfile)

    print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time)

If simulation counts are small (~50), then it takes a while to get started, but i start to see progress print statements. Strangely, multiple tasks will get assigned to the same engine and I don't see a response until all of those assigned tasks are completed for that engine. I would expect to see a response from enumerate(ar) every time a single simulation task completes.

If simulation counts are large (~1000), it takes a long time to get started, i see the CPUs throttle up on all engines, but no progress print statements are seen until a long time (~40mins), and when I do see progress, it appears a large block (>100) of tasks went to same engine, and awaited completion from that one engine before providing some progress. When that one engine did complete, i saw the ar object provided new responses ever 4 secs - this may have been the time delay to write the output pickle files.

Lastly, host1 also runs the ipycontroller task, and it's memory usage goes up like crazy (a Python task shows using >6GB RAM, a kernel task shows using 3GB). The host2 engine doesn't really show much memory usage at all. What would cause this spike in memory?

回答1:

I have used this logic in a code couple years ago, and I got using this. My code was something like:

shared_dict = {
    # big dict with ~10k keys, each with a list of dicts
}

balancer = engines.load_balanced_view()

with engines[:].sync_imports(): # your 'view' variable 
    import pandas as pd
    import ujson as json

engines[:].push(shared_dict)

results = balancer.map(lambda i: (i, my_func(i)), id)
results_data = results.get()

If simulation counts are small (~50), then it takes a while to get started, but i start to see progress print statements. Strangely, multiple tasks will get assigned to the same engine and I don't see a response until all of those assigned tasks are completed for that engine. I would expect to see a response from enumerate(ar) every time a single simulation task completes.

In my case, my_func() was a complex method where I put lots of logging messages written into a file, so I had my print statements.

About the task assignment, as I used load_balanced_view(), I left to the library find its way, and it did great.

If simulation counts are large (~1000), it takes a long time to get started, i see the CPUs throttle up on all engines, but no progress print statements are seen until a long time (~40mins), and when I do see progress, it appears a large block (>100) of tasks went to same engine, and awaited completion from that one engine before providing some progress. When that one engine did complete, i saw the ar object provided new responses ever 4 secs - this may have been the time delay to write the output pickle files.

About the long time, I haven't experienced that, so I can't say nothing.

I hope this might cast some light in your problem.


PS: as I said in the comment, you could try multiprocessing.Pool. I guess I haven't tried to share a big, read-only data as a global variable using it. I would give a try, because it seems to work.



回答2:

Sometimes you need to scatter your data grouping by a category, so that you are sure that the each subgroup will be entirely contained by a single cluster.

This is how I usually do it:

# Connect to the clusters
import ipyparallel as ipp
client = ipp.Client()
lview  = client.load_balanced_view()
lview.block = True
CORES = len(client[:])

# Define the scatter_by function
def scatter_by(df,grouper,name='df'):
    sz = df.groupby([grouper]).size().sort_values().index.unique()
    for core in range(CORES):
        ids = sz[core::CORES]
        print("Pushing {0} {1}s into cluster {2}...".format(size(ids),grouper,core))
        client[core].push({name:df[df[grouper].isin(ids)]})

# Scatter the dataframe df grouping by `year`
scatter_by(df,'year')

Notice that the function I'm suggesting scatters makes sure each cluster will host a similar number of observations, which is usually a good idea.