Problem
I'm willing to do a feature engineering using multiprocessing module (multiprocessing.Pool.starmap()
.
However, it gives an error message as follows. I guess this error message is about the size of inputs (2147483647 = 2^31 − 1?), since the same code worked smoothly for a fraction(frac=0.05)
of input dataframes(train_scala, test, ts). I convert types of data frame as smallest as possible, however it does not get better.
The anaconda version is 4.3.30 and the Python version is 3.6 (64 bit). And the memory size of the system is over 128GB with more than 20 cores. Would you like to suggest any pointer or solution to overcome this problem? If this problem is caused by a large data for a multiprocessing module, How much smaller data should I use to utilize the multiprocessing module on Python3?
Code:
from multiprocessing import Pool, cpu_count
from itertools import repeat
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)
Error Message:
Traceback (most recent call last):
File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
print('----Pool starmap start----')
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Extra infos
- historyCutoffs is a list of integers
- train_scala is a pandas DataFrame (377MB)
- test is a pandas DataFrame (15MB)
- ts is a pandas DataFrame (547MB)
- ul_parts_path is a list of directories (string)
- is_train_seq is a list of booleans
Extra Code: Method multiprocess_FE
def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
train_dict = {}
ts_dict = {}
msno_dict = {}
ul_dict = {}
if is_train == True:
train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
else:
train_dict[historyCutoff] = test
msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))
ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt")) ##.sample(frac=0.01, replace=False)
ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)