How to parallelize a for in python inside a class?

2020-04-10 01:49发布

I have a python function funz that returns every time a different array of length p. I need to run this function different times and then to compute the mean of each value.

I can do this with a for loop but it takes a lot of times.

I am trying to use the library multiprocessing but I get into an error.

import sklearn as sk
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn import preprocessing,linear_model, cross_validation
from scipy import stats
from multiprocessing import Pool


class stabilize(BaseEstimator,TransformerMixin):

    def __init__(self,sim=3,n_folds=3):
        self.sim=sim
        self.n_folds=n_folds

    def fit(self,X,y):
        self.n,self.p=X.shape
        self.X=X
        self.y=y        
        self.beta=np.zeros(shape=(self.sim,self.p))
        self.alpha_min=[]        
        self.mapper=p.map(self.multiple_cv,[1]*self.sim)    

    def multiple_cv(self,o):
        kf=sk.cross_validation.KFold(self.n,n_folds=self.n_folds,shuffle=True)
        cv=sk.linear_model.LassoCV(cv=kf).fit(self.X,self.y)
        beta=cv.coef_
        alpha_min=cv.alpha_
        return alpha_min

I used a dummy variable o to tell how many parallel process I would like to use. This is not very elegant and maybe is part of the error. The variables X and y are already part of the class so I do not have argument to pass to the function multiple_cv.

When I run the program I get this error

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

1条回答
ら.Afraid
2楼-- · 2020-04-10 02:02

Your problem is that the function you want to call is a instance method of an object. This can not be serialized and sent to another process. I see two solutions:

  1. use a different globally available function:

    class stabilize(BaseEstimator,TransformerMixin):
        ...
    def multiple_cv((self,o)):
        ...
    

    and

        self.mapper=p.map(self.multiple_cv,[(self, 1)]*self.sim)
    
  2. make the methods of objects serializable using VeryPicklableObject and its dependencies.

        @picklableInstancemethod
        def multiple_cv(self, o):
            ...
    
查看更多
登录 后发表回答