I'm facing the following issue. I'm trying to parallelize a function that updates a file, but I cannot start the Pool()
because of an OSError: [Errno 12] Cannot allocate memory
. I've started looking around on the server, and it's not like I'm using an old, weak one/out of actual memory.
See htop
:
Also, free -m
shows I have plenty of RAM available in addition to the ~7GB of swap memory:
And the files I'm trying to work with aren't that big either. I'll paste my code (and the stack trace) below, there, the sizes are as follows:
The predictionmatrix
dataframe used takes up ca. 80MB according to pandasdataframe.memory_usage()
The file geo.geojson
is 2MB
How do I go about debugging this? What can I check and how? Thank you for any tips/tricks!
Code:
def parallelUpdateJSON(paramMatch, predictionmatrix, data):
for feature in data['features']:
currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch]
if (len(currentfeature) > 0):
feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}})
else:
feature['properties'].update({"style": {"opacity": 0}})
def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix):
with open('geo.geojson') as f:
data = json.load(f)
paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
pool = Pool()
func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
pool.map(func, data)
pool.close()
pool.join()
with open('output.geojson', 'w') as outfile:
json.dump(data, outfile)
Stack Trace:
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-428-d6121ed2750b> in <module>()
----> 1 writeGeoJSON(6, 15, baseline)
<ipython-input-427-973b7a5a8acc> in writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix)
14 print("Start loop")
15 paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
---> 16 pool = Pool(2)
17 func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
18 print(predictionmatrix.memory_usage())
/usr/lib/python3.5/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
116 from .pool import Pool
117 return Pool(processes, initializer, initargs, maxtasksperchild,
--> 118 context=self.get_context())
119
120 def RawValue(self, typecode_or_type, *args):
/usr/lib/python3.5/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
166 self._processes = processes
167 self._pool = []
--> 168 self._repopulate_pool()
169
170 self._worker_handler = threading.Thread(
/usr/lib/python3.5/multiprocessing/pool.py in _repopulate_pool(self)
231 w.name = w.name.replace('Process', 'PoolWorker')
232 w.daemon = True
--> 233 w.start()
234 util.debug('added worker')
235
/usr/lib/python3.5/multiprocessing/process.py in start(self)
103 'daemonic processes are not allowed to have children'
104 _cleanup()
--> 105 self._popen = self._Popen(self)
106 self._sentinel = self._popen.sentinel
107 _children.add(self)
/usr/lib/python3.5/multiprocessing/context.py in _Popen(process_obj)
265 def _Popen(process_obj):
266 from .popen_fork import Popen
--> 267 return Popen(process_obj)
268
269 class SpawnProcess(process.BaseProcess):
/usr/lib/python3.5/multiprocessing/popen_fork.py in __init__(self, process_obj)
18 sys.stderr.flush()
19 self.returncode = None
---> 20 self._launch(process_obj)
21
22 def duplicate_for_child(self, fd):
/usr/lib/python3.5/multiprocessing/popen_fork.py in _launch(self, process_obj)
65 code = 1
66 parent_r, child_w = os.pipe()
---> 67 self.pid = os.fork()
68 if self.pid == 0:
69 try:
OSError: [Errno 12] Cannot allocate memory
UPDATE
According to @robyschek's solution, I've updated my code to:
global g_predictionmatrix
def worker_init(predictionmatrix):
global g_predictionmatrix
g_predictionmatrix = predictionmatrix
def parallelUpdateJSON(paramMatch, data_item):
for feature in data_item['features']:
currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch]
if (len(currentfeature) > 0):
feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}})
else:
feature['properties'].update({"style": {"opacity": 0}})
def use_the_pool(data, paramMatch, predictionmatrix):
pool = Pool(initializer=worker_init, initargs=(predictionmatrix,))
func = partial(parallelUpdateJSON, paramMatch)
pool.map(func, data)
pool.close()
pool.join()
def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix):
with open('geo.geojson') as f:
data = json.load(f)
paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
use_the_pool(data, paramMatch, predictionmatrix)
with open('trentino-grid.geojson', 'w') as outfile:
json.dump(data, outfile)
And I still get the same error. Also, according to the documentation, map()
should divide my data
into chunks, so I don't think it should replicate my 80MBs rownum times. I may be wrong though... :)
Plus I've noticed that if I use smaller input (~11MB instead of 80MB) I don't get the error. So I guess I'm trying to use too much memory, but I can't imagine how it goes from 80MB to something 16GBs of RAM can't handle.
We had this a couple of time. According to my sys admin, there is "a bug" in unix, which will raise the same error if you are out of memory, of if your process reach the max file descriptor limit.
We had a leak of file descriptor, and the error raising was [Errno 12] Cannot allocate memory#012OSError.
So you should look at your script and double check if the problem is not the creation of too many FD instead
When using a
multiprocessing.Pool
, the default way to start the processes isfork
. The issue withfork
is that the entire process is duplicated. (see details here). Thus if your main process is already using a lot of memory, this memory will be duplicated, reaching thisMemoryError
. For instance, if your main process use2GB
of memory and you use 8 subprocesses, you will need18GB
in RAM.You should try using a different start method such as
'forkserver'
or'spawn'
:These methods avoid duplicating the workspace of your
Process
but can be a bit slower to start as you need to reload the modules you are using.