I want to use multiprocessing on a model to get predictions using a data frame as input. I have the following code:
def perform_model_predictions(model, dataFrame, cores=4):
try:
with Pool(processes=cores) as pool:
result = pool.map(model.predict, dataFrame)
return result
# return model.predict(dataFrame)
except AttributeError:
logging.error("AttributeError occurred", exc_info=True)
The error I'm getting is:
raise TypeError("sparse matrix length is ambiguous; use getnnz()"
TypeError: sparse matrix length is ambiguous; use getnnz() or shape[0]
I think the issue is with the fact that I'm passing in a data frame as the second parameter to the pool.map
function. Any advice or help would be appreciated.
The trick is to split your dataframe into chunks. map
expects a list of objects that are going to be processed by the model.predict
. Here's a full working example, with model obviously mocked:
import numpy as np
import pandas as pd
from multiprocessing import Pool
no_cores = 4
large_df = pd.concat([pd.Series(np.random.rand(1111)), pd.Series(np.random.rand(1111))], axis = 1)
chunk_size = len(large_df) // no_cores + no_cores
chunks = [df_chunk for g, df_chunk in large_df.groupby(np.arange(len(large_df)) // chunk_size)]
class model(object):
@staticmethod
def predict(df):
return np.random.randint(0,2)
def perform_model_predictions(model, dataFrame, cores):
try:
with Pool(processes=cores) as pool:
result = pool.map(model.predict, dataFrame)
return result
# return model.predict(dataFrame)
except AttributeError:
logging.error("AttributeError occurred", exc_info=True)
perform_model_predictions(model, chunks, no_cores)
Mind that the number of chunks here is selected such that it matches number of cores (or simply any number you want to allocate). This way each core gets a fair share and multiprocessing
does not spend much time on object serialization.
If you'd like to process each row (pd.Series
) separately, time spent on serialization could be a concern. In such case I'd recommend using joblib
and reading docs on its various backends. I did not write on it as it seemed you want to call predict on pd.Dataframe
.
Extra warning
It can happen that multiprocessing
, instead of getting you better performance, will make it worse. It happens in rather rare situations when your model.predict
calls external modules that themselves spawn threads. I wrote about the issue here. Long story short, joblib
again could be an answer.