Python multiprocessing - Debugging OSError: [Errno

2019-02-15 13:38发布

问题:

I'm facing the following issue. I'm trying to parallelize a function that updates a file, but I cannot start the Pool() because of an OSError: [Errno 12] Cannot allocate memory. I've started looking around on the server, and it's not like I'm using an old, weak one/out of actual memory. See htop: Also, free -m shows I have plenty of RAM available in addition to the ~7GB of swap memory: And the files I'm trying to work with aren't that big either. I'll paste my code (and the stack trace) below, there, the sizes are as follows:

The predictionmatrix dataframe used takes up ca. 80MB according to pandasdataframe.memory_usage() The file geo.geojson is 2MB

How do I go about debugging this? What can I check and how? Thank you for any tips/tricks!

Code:

def parallelUpdateJSON(paramMatch, predictionmatrix, data):
    for feature in data['features']: 
        currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch]
        if (len(currentfeature) > 0):
            feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}})
        else:
            feature['properties'].update({"style": {"opacity": 0}})

def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix):
    with open('geo.geojson') as f:
        data = json.load(f)
    paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
    pool = Pool()
    func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
    pool.map(func, data)
    pool.close()
    pool.join()

    with open('output.geojson', 'w') as outfile:
        json.dump(data, outfile)

Stack Trace:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-428-d6121ed2750b> in <module>()
----> 1 writeGeoJSON(6, 15, baseline)

<ipython-input-427-973b7a5a8acc> in writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix)
     14     print("Start loop")
     15     paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
---> 16     pool = Pool(2)
     17     func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
     18     print(predictionmatrix.memory_usage())

/usr/lib/python3.5/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
    116         from .pool import Pool
    117         return Pool(processes, initializer, initargs, maxtasksperchild,
--> 118                     context=self.get_context())
    119 
    120     def RawValue(self, typecode_or_type, *args):

/usr/lib/python3.5/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
    166         self._processes = processes
    167         self._pool = []
--> 168         self._repopulate_pool()
    169 
    170         self._worker_handler = threading.Thread(

/usr/lib/python3.5/multiprocessing/pool.py in _repopulate_pool(self)
    231             w.name = w.name.replace('Process', 'PoolWorker')
    232             w.daemon = True
--> 233             w.start()
    234             util.debug('added worker')
    235 

/usr/lib/python3.5/multiprocessing/process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         _children.add(self)

/usr/lib/python3.5/multiprocessing/context.py in _Popen(process_obj)
    265         def _Popen(process_obj):
    266             from .popen_fork import Popen
--> 267             return Popen(process_obj)
    268 
    269     class SpawnProcess(process.BaseProcess):

/usr/lib/python3.5/multiprocessing/popen_fork.py in __init__(self, process_obj)
     18         sys.stderr.flush()
     19         self.returncode = None
---> 20         self._launch(process_obj)
     21 
     22     def duplicate_for_child(self, fd):

/usr/lib/python3.5/multiprocessing/popen_fork.py in _launch(self, process_obj)
     65         code = 1
     66         parent_r, child_w = os.pipe()
---> 67         self.pid = os.fork()
     68         if self.pid == 0:
     69             try:

OSError: [Errno 12] Cannot allocate memory

UPDATE

According to @robyschek's solution, I've updated my code to:

global g_predictionmatrix 

def worker_init(predictionmatrix):
    global g_predictionmatrix
    g_predictionmatrix = predictionmatrix    

def parallelUpdateJSON(paramMatch, data_item):
    for feature in data_item['features']: 
        currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch]
        if (len(currentfeature) > 0):
            feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}})
        else:
            feature['properties'].update({"style": {"opacity": 0}})

def use_the_pool(data, paramMatch, predictionmatrix):
    pool = Pool(initializer=worker_init, initargs=(predictionmatrix,))
    func = partial(parallelUpdateJSON, paramMatch)
    pool.map(func, data)
    pool.close()
    pool.join()


def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix):
    with open('geo.geojson') as f:
        data = json.load(f)
    paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
    use_the_pool(data, paramMatch, predictionmatrix)     
    with open('trentino-grid.geojson', 'w') as outfile:
        json.dump(data, outfile)

And I still get the same error. Also, according to the documentation, map() should divide my data into chunks, so I don't think it should replicate my 80MBs rownum times. I may be wrong though... :) Plus I've noticed that if I use smaller input (~11MB instead of 80MB) I don't get the error. So I guess I'm trying to use too much memory, but I can't imagine how it goes from 80MB to something 16GBs of RAM can't handle.

回答1:

We had this a couple of time. According to my sys admin, there is "a bug" in unix, which will raise the same error if you are out of memory, of if your process reach the max file descriptor limit.

We had a leak of file descriptor, and the error raising was [Errno 12] Cannot allocate memory#012OSError.

So you should look at your script and double check if the problem is not the creation of too many FD instead



回答2:

When using a multiprocessing.Pool, the default way to start the processes is fork. The issue with fork is that the entire process is duplicated. (see details here). Thus if your main process is already using a lot of memory, this memory will be duplicated, reaching this MemoryError. For instance, if your main process use 2GB of memory and you use 8 subprocesses, you will need 18GB in RAM.

You should try using a different start method such as 'forkserver' or 'spawn':

from multiprocessing import set_start_method, Pool
set_start_method('forkserver')

# You can then start your Pool without each process
# cloning your entire memory
pool = Pool()
func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
pool.map(func, data)

These methods avoid duplicating the workspace of your Process but can be a bit slower to start as you need to reload the modules you are using.