python struct.error: 'i' format requires -

2019-02-16 15:03发布

Problem

I'm willing to do a feature engineering using multiprocessing module (multiprocessing.Pool.starmap(). However, it gives an error message as follows. I guess this error message is about the size of inputs (2147483647 = 2^31 − 1?), since the same code worked smoothly for a fraction(frac=0.05) of input dataframes(train_scala, test, ts). I convert types of data frame as smallest as possible, however it does not get better.

The anaconda version is 4.3.30 and the Python version is 3.6 (64 bit). And the memory size of the system is over 128GB with more than 20 cores. Would you like to suggest any pointer or solution to overcome this problem? If this problem is caused by a large data for a multiprocessing module, How much smaller data should I use to utilize the multiprocessing module on Python3?

Code:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

Error Message:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

Extra infos

  • historyCutoffs is a list of integers
  • train_scala is a pandas DataFrame (377MB)
  • test is a pandas DataFrame (15MB)
  • ts is a pandas DataFrame (547MB)
  • ul_parts_path is a list of directories (string)
  • is_train_seq is a list of booleans

Extra Code: Method multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

2条回答
做自己的国王
2楼-- · 2019-02-16 15:51

The communication protocol between processes uses pickling, and the pickled data is prefixed with the size of the pickled data. For your method, all arguments together are pickled as one object.

You produced an object that when pickled is larger than fits in a i struct formatter (a four-byte signed integer), which breaks the assumptions the code has made.

You could delegate reading of your dataframes to the child process instead, only sending across the metadata needed to load the dataframe. Their combined size is nearing 1GB, way too much data to share over a pipe between your processes.

Quoting from the Programming guidelines section:

Better to inherit than pickle/unpickle

When using the spawn or forkserver start methods many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

If you are not running on Windows and use either the spawn or forkserver methods, you could load your dataframes as globals before starting your subprocesses, at which point the child processes will 'inherit' the data via the normal OS copy-on-write memory page sharing mechanisms.

查看更多
▲ chillily
3楼-- · 2019-02-16 15:57

this problem was fixed in a recent PR to python https://github.com/python/cpython/pull/10305

if you want, you can make this change locally to make it work for you right away, without waiting for a python and anaconda release.

查看更多
登录 后发表回答