Similar errors in MultiProcessing. Mismatch number

2019-07-14 09:01发布

问题:

I couldn't find a better way to describe the error I'm facing, but this error seems to come up everytime I try to implement Multiprocessing to a loop call.

I've used both sklearn.externals.joblib as well as multiprocessing.Process but error are similar though different.

Original Loop on which want to apply Multiprocessing, where one iteration in executed in single thread/process

for dd in final_col_dates:
    idx1 = final_col_dates.tolist().index(dd)

    dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
    data2 = dataObj.fit()

    dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
    data3 = dataObj.fit()

    if idx1 > 0:
       data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan

    iter += 1

    days_out_vars.append(data3)

For implementing the above code snipet as Multi Processing, I created a method, where the above code goes except the for loop.

Using Joblib, the following is my code snippet.

Parallel(n_jobs=2)(
            delayed(self.ParallelLoopTest)(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list)
                    for dd in final_col_dates)

the variable return_list is shared variable which is executed inside method ParallelLoopTest. it is declared as :

manager = Manager()
return_list = manager.list()

Using the above code snippet, I face the following error:

Process SpawnPoolWorker-3:
Traceback (most recent call last):
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 249, in _bootstrap
  self.run()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 93, in run
  self._target(*self._args, **self._kwargs)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
  task = get()
File "C:\Users\dkanhar\Anaconda3\lib\site-packages\sklearn\externals\joblib\pool.py", line 359, in get
  return recv()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\connection.py", line 251, in recv
  return ForkingPickler.loads(buf.getbuffer())
TypeError: function takes at most 0 arguments (1 given)

I also tried multiprocessing module to execute the above mentioned code, and still faced similar error. The following code was used to run using multiprocessing module:

for dd in final_col_dates:
    # multiprocessing.Pipe(False)
    p = multiprocessing.Process(target=self.ParallelLoopTest, args=(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list))
    jobs.append(p)
    p.start()

for proc in jobs:
    proc.join()

And, I face the following traceback of error:

File "<string>", line 1, in <module>
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 106, in spawn_main
   exitcode = _main(fd)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\spawn.py", line 116, in _main
   self = pickle.load(from_parent)
TypeError: function takes at most 0 arguments (1 given)
Traceback (most recent call last):
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 457, in <module>
   print(obj.fit())
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 39, in fit
return self.__driver__()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 52, in __driver__
   final = self.process_()
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 135, in process_
   sch_dat = self.inline_apply_(all_dates_schd, d1, d2, a)
File "E:/Projects/Predictive Inbound Cartoon Estimation-MLO/Python/dataprep/DataPrep.py", line 297, in inline_apply_
   p.start()
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\process.py", line 105, in start
   self._popen = self._Popen(self)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
   return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
   return Popen(process_obj)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
   reduction.dump(process_obj, to_child)
File "C:\Users\dkanhar\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
   ForkingPickler(file, protocol).dump(obj)
   BrokenPipeError: [Errno 32] Broken pipe

So, I tried uncommenting the line multiprocessing.Pipe(False) thinking it is maybe because of using Pipe, which I disabled, but still the problem persists and I face same error.

If of any help, following is my method ParallerLoopTest:

def ParallelLoopTest(self, dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars):
    idx1 = final_col_dates.tolist().index(dd)

    dataObj = GetPrevDataByDate(d1, a, dd, self.start_hour_of_day)
    data2 = dataObj.fit()

    dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, self.DC, frgt_typ_filter)
    data3 = dataObj.fit()

    if idx1 > 0:
        data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan

    print("Iter ", iter)
    iter += 1

    days_out_vars.append(data3)

The reason why I said similar errors is because if you look at Traceback of both errors, they both have similar error line inbetween:

TypeError: function takes at most 0 arguments (1 given) while loading from Pickle which I dont know why it is happening.

Also note, that I've successfully implemented both of these modules in other projects earlier, but never faced an issue, so I dont know why this problem started coming up now, and what exactly this problem means.

Any help would be really appreciated, as I've been wasting time to debug this since 3 days.

Thanks

Edit 1 after last answer

After answer, the following this I tried. added decorator @staticmethod, removed self, and called the method using DataPrep.ParallelLoopTest(args).

Also, moved the method out of class DataPrep, and called simply by ParallelLoopTest(args),

but in both cases the error remains same.

PS: I tried using joblib for both cases. So, neither of solutions worked.

New method defination:

def ParallelLoopTest(dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, days_out_vars, DC, start_hour):
    idx1 = final_col_dates.tolist().index(dd)

    dataObj = GetPrevDataByDate(d1, a, dd, start_hour_of_day)
    data2 = dataObj.fit()

    dataObj = GetAppointmentControlsSchedule(data2, idx1, d, final_col_dates_mod, dd, DC, frgt_typ_filter)
    data3 = dataObj.fit()

    if idx1 > 0:
        data3['APPT_SCHD_ARVL_D_{}'.format(idx1)] = np.nan

    print("Iter ", iter)
    iter += 1

    days_out_vars.append(data3)

Edit 2:

I was facing error as Python was unable to pickle some large dataframes. I had 2 DataFrames in my parameter/arguments, one around 20MB other 200MB in pickle format. But that shouldn't be an issue right? We should be able to pass Pandas DataFrame. Correct me if I'm wrong.

Also, workaround this was I saved the DataFrame as csv before method call with a random name, pass the file name, and read csv, but that is slow process as it involved reasong huge csv files. Any suggestions?

回答1:

You actually get the exact same error in both case but as you use a Pool in one example (joblib) and a Process in the other you don't get the same failure/traceback in your main thread as they do not manage the Process failure the same way.
In both cases, your process seems to fail to unpickle your child job in the new Process. The Pool give you back the unpickling error whereas using Process, you get a failure as when the subprocess dies from this unpickling error, it closes the pipe used by the main thread to write data, causing an error in the main process.

My first idea would be that the error is caused as you try to pickle an instance method whereas you should try to use a static method here (using an instance method does not seem right as the object is not shared between processes).
Use the decorator @staticmethod before you declare ParallelLoopTest and remove the self argument.

EDIT: Another possibility is that one of the arguments dd, final_col_dates, d1, a, d, final_col_dates_mod, iter, return_list cannot be unpickled. Apparently, it comes from panda.DataFrame.
I do not see any reason why the unpickling fail in this case but I don't know panda that well.
One work around is to dump the data in a temporary file. You can look at this link here for efficient serialization of panda.DataFrame. Another solution is to use the DataFrame.to_pickle method and panda.read_pickle to dump/retrieve it to/from a file.

Note that it would be better to compare joblib.Parallel with multiprocessing.Pool and not with multiprocessing.Process.