I couldn't find a better way to describe the error I'm facing, but this error seems to come up everytime I try to implement Multiprocessing to a loop call.
I've used both sklearn.externals.joblib as well as multiprocessing.Process but error are similar though different.
Original Loop on which want to apply Multiprocessing, where one iteration in executed in single thread/process
for dd in final_col_dates:
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
iter += 1
days_out_vars.append(data3)
For implementing the above code snipet as Multi Processing, I created a method, where the above code goes except the for loop.
Using Joblib, the following is my code snippet.
Parallel(n_jobs=2)(
delayed(self.ParallelLoopTest)(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list)
for dd in final_col_dates)
the variable return_list is shared variable which is executed inside method ParallelLoopTest. it is declared as :
manager = Manager()
return_list = manager.list()
Using the above code snippet, I face the following error:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap
self.run()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Users\dkanhar\Anaconda3\lib\site-packages\sklearn\externals\joblib\pool.py", line 359, in get
return recv()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\connection.py", line 251, in recv
return ForkingPickler.loads(buf.getbuffer())
TypeError: function takes at most 0 arguments (1 given)
I also tried multiprocessing module to execute the above mentioned code, and still faced similar error. The following code was used to run using multiprocessing module:
for dd in final_col_dates:
# multiprocessing.Pipe(False)
p = multiprocessing.Process(target=self.ParallelLoopTest, args=(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
And, I face the following traceback of error:
File "<string>", line 1, in <module>
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
TypeError: function takes at most 0 arguments (1 given)
Traceback (most recent call last):
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 457, in <module>
print(obj.fit())
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 39, in fit
return self.__driver__()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 52, in __driver__
final = self.process_()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 135, in process_
sch_dat = self.inline_apply_(all_dates_schd, d1, d2, a)
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 297, in inline_apply_
p.start()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
BrokenPipeError: [Errno 32] Broken pipe
So, I tried uncommenting the line multiprocessing.Pipe(False) thinking it is maybe because of using Pipe, which I disabled, but still the problem persists and I face same error.
If of any help, following is my method ParallerLoopTest:
def ParallelLoopTest(self, dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
The reason why I said similar errors is because if you look at Traceback of both errors, they both have similar error line inbetween:
TypeError: function takes at most 0 arguments (1 given) while loading from Pickle which I dont know why it is happening.
Also note, that I've successfully implemented both of these modules in other projects earlier, but never faced an issue, so I dont know why this problem started coming up now, and what exactly this problem means.
Any help would be really appreciated, as I've been wasting time to debug this since 3 days.
Thanks
Edit 1 after last answer
After answer, the following this I tried. added decorator @staticmethod, removed self, and called the method using DataPrep.ParallelLoopTest(args).
Also, moved the method out of class DataPrep, and called simply by ParallelLoopTest(args),
but in both cases the error remains same.
PS: I tried using joblib for both cases. So, neither of solutions worked.
New method defination:
def ParallelLoopTest(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars, DC, start_hour):
idx1 = final_col_dates.tolist().index(dd)
dataObj = GetPrevDataByDate(d1, a, dd, start_hour_of_day)
data2 = dataObj.fit()
dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, DC, frgt_typ_filter)
data3 = dataObj.fit()
if idx1 > 0:
data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan
print("Iter ", iter)
iter += 1
days_out_vars.append(data3)
Edit 2:
I was facing error as Python was unable to pickle some large dataframes. I had 2 DataFrames in my parameter/arguments, one around 20MB other 200MB in pickle format. But that shouldn't be an issue right? We should be able to pass Pandas DataFrame. Correct me if I'm wrong.
Also, workaround this was I saved the DataFrame as csv before method call with a random name, pass the file name, and read csv, but that is slow process as it involved reasong huge csv files. Any suggestions?
You actually get the exact same error in both case but as you use a
Pool
in one example (joblib
) and aProcess
in the other you don't get the same failure/traceback in your main thread as they do not manage the Process failure the same way.In both cases, your process seems to fail to unpickle your child job in the new
Process
. ThePool
give you back the unpickling error whereas usingProcess
, you get a failure as when the subprocess dies from this unpickling error, it closes the pipe used by the main thread to write data, causing an error in the main process.My first idea would be that the error is caused as you try to pickle an instance method whereas you should try to use a static method here (using an instance method does not seem right as the object is not shared between processes).
Use the decorator
@staticmethod
before you declareParallelLoopTest
and remove theself
argument.EDIT: Another possibility is that one of the arguments
dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list
cannot be unpickled. Apparently, it comes frompanda.DataFrame
.I do not see any reason why the unpickling fail in this case but I don't know
panda
that well.One work around is to dump the data in a temporary file. You can look at this link here for efficient serialization of
panda.DataFrame
. Another solution is to use theDataFrame.to_pickle
method andpanda.read_pickle
to dump/retrieve it to/from a file.Note that it would be better to compare
joblib.Parallel
withmultiprocessing.Pool
and not withmultiprocessing.Process
.