I couldn't find a better way to describe the error I'm facing, but this error seems to come up everytime I try to implement Multiprocessing to a loop call.
I've used both sklearn.externals.joblib as well as multiprocessing.Process but error are similar though different.
Original Loop on which want to apply Multiprocessing, where one iteration in executed in single thread/process
for dd in final_col_dates:
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
iter += 1
days_out_vars.append(data3)
For implementing the above code snipet as Multi Processing, I created a method, where the above code goes except the for loop.
Using Joblib, the following is my code snippet.
Parallel(n_jobs=2)(
delayed(self.ParallelLoopTest)(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list)
for dd in final_col_dates)
the variable return_list is shared variable which is executed inside method ParallelLoopTest. it is declared as :
manager = Manager()
return_list = manager.list()
Using the above code snippet, I face the following error:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\dkanhar\Anaconda3\lib\site-packages\sklearn\externals\joblib\pool.py", line 359, in get
return recv()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
TypeError: function takes at most 0 arguments (1 given)
I also tried multiprocessing module to execute the above mentioned code, and still faced similar error. The following code was used to run using multiprocessing module:
for dd in final_col_dates:
# multiprocessing.Pipe(False)
p = multiprocessing.Process(target=self.ParallelLoopTest, args=(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
And, I face the following traceback of error:
File "<string>", line 1, in <module>
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
TypeError: function takes at most 0 arguments (1 given)
Traceback (most recent call last):
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 457, in <module>
print(obj.fit())
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 39, in fit
return self.__driver__()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 52, in __driver__
final = self.process_()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 135, in process_
sch_dat = self.inline_apply_(all_dates_schd, d1, d2, a)
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 297, in inline_apply_
p.start()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe
So, I tried uncommenting the line multiprocessing.Pipe(False) thinking it is maybe because of using Pipe, which I disabled, but still the problem persists and I face same error.
If of any help, following is my method ParallerLoopTest:
def ParallelLoopTest(self, dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
The reason why I said similar errors is because if you look at Traceback of both errors, they both have similar error line inbetween:
TypeError: function takes at most 0 arguments (1 given) while loading from Pickle which I dont know why it is happening.
Also note, that I've successfully implemented both of these modules in other projects earlier, but never faced an issue, so I dont know why this problem started coming up now, and what exactly this problem means.
Any help would be really appreciated, as I've been wasting time to debug this since 3 days.
Thanks
Edit 1 after last answer
After answer, the following this I tried. added decorator @staticmethod, removed self, and called the method using DataPrep.ParallelLoopTest(args).
Also, moved the method out of class DataPrep, and called simply by ParallelLoopTest(args),
but in both cases the error remains same.
PS: I tried using joblib for both cases. So, neither of solutions worked.
New method defination:
def ParallelLoopTest(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars, DC, start_hour):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
Edit 2:
I was facing error as Python was unable to pickle some large dataframes. I had 2 DataFrames in my parameter/arguments, one around 20MB other 200MB in pickle format. But that shouldn't be an issue right? We should be able to pass Pandas DataFrame. Correct me if I'm wrong.
Also, workaround this was I saved the DataFrame as csv before method call with a random name, pass the file name, and read csv, but that is slow process as it involved reasong huge csv files. Any suggestions?