Multiprocessing on a model with data frame as inpu

2019-05-02 11:03发布

问题:

I want to use multiprocessing on a model to get predictions using a data frame as input. I have the following code:

def perform_model_predictions(model, dataFrame, cores=4): 
    try:
        with Pool(processes=cores) as pool:
            result = pool.map(model.predict, dataFrame)
            return result
        # return model.predict(dataFrame)
    except AttributeError:
        logging.error("AttributeError occurred", exc_info=True)

The error I'm getting is:

raise TypeError("sparse matrix length is ambiguous; use getnnz()"
TypeError: sparse matrix length is ambiguous; use getnnz() or shape[0]

I think the issue is with the fact that I'm passing in a data frame as the second parameter to the pool.map function. Any advice or help would be appreciated.

回答1:

The trick is to split your dataframe into chunks. map expects a list of objects that are going to be processed by the model.predict. Here's a full working example, with model obviously mocked:

import numpy as np
import pandas as pd
from multiprocessing import Pool

no_cores = 4

large_df = pd.concat([pd.Series(np.random.rand(1111)), pd.Series(np.random.rand(1111))], axis = 1)
chunk_size = len(large_df) // no_cores + no_cores
chunks = [df_chunk for g, df_chunk in large_df.groupby(np.arange(len(large_df)) // chunk_size)]

class model(object):
    @staticmethod
    def predict(df):
        return np.random.randint(0,2)

def perform_model_predictions(model, dataFrame, cores): 
    try:
        with Pool(processes=cores) as pool:
            result = pool.map(model.predict, dataFrame)
            return result
        # return model.predict(dataFrame)
    except AttributeError:
        logging.error("AttributeError occurred", exc_info=True)

perform_model_predictions(model, chunks, no_cores)

Mind that the number of chunks here is selected such that it matches number of cores (or simply any number you want to allocate). This way each core gets a fair share and multiprocessing does not spend much time on object serialization.

If you'd like to process each row (pd.Series) separately, time spent on serialization could be a concern. In such case I'd recommend using joblib and reading docs on its various backends. I did not write on it as it seemed you want to call predict on pd.Dataframe.

Extra warning

It can happen that multiprocessing, instead of getting you better performance, will make it worse. It happens in rather rare situations when your model.predict calls external modules that themselves spawn threads. I wrote about the issue here. Long story short, joblib again could be an answer.