I am launching a Process that fetches a couple of gigs of data from a database into a DataFrame with a date index. From there I create a Manager to store that data and call a function using a Pool to utilize CPU cores. Since I have so much data, I need to use shared memory for the pooled methods to work on.
import multiprocessing as mp
import pandas as pd
# Launched from __main__
class MyProcess(mp.Process):
def calculate():
# Get and preprocess data
allMyData = <fetch from Mongo>
aggs = <resample data into multiple aggregations>
manager = mp.Manager()
aggregateData = manager.dict()
for key, value in aggs.items():
aggregateData[key] = value
# Setup the pool and methods
NUM_PROCESSES = 16
pool = mp.Pool(processes=NUM_PROCESSES)
procs = []
for thing in thousandsOfThingsIWantToCalculate:
# Run the method asynchronously
proc = pool.apply_async(method, args=(aggregateData))
procs.append({proc:[]})
# Wait so all of the pool methods are not loaded created in memory at once, without this there was a different memory problem
while len(pool._cache) > NUM_PROCESSES:
sleep(0.01)
for dict in procs:
p, res = next(item(dict.items()))
res = p.get()
pool.close()
pool.join()
# Do stuff with results
def method(data):
...
# Loop through all the data by row
for row in data.itertuples():
# Can be empty
...
Eventually I will get this error and the Process that runs the pool quits. This occurs before the "for thing in ..." loop completes.
OSError: [Errno 12] Cannot allocate memory
I have narrowed everything down to this line in method, which I believe is creating a copy of the memory in the row variable that is not efficiently released. If I comment out the line, other memory is consumed but released by the pooled method. If I run the line, even with an empty body and nothing done afterwards, memory is consumed completely.
for row in data.itertuples():
- Is the row actually a copy of the dataframe data, which would account for the excess memory consumption? Update: YES, also anything increasing retain count is copied (i.e. local variables)
- I have noticed in other places that creating local variables shows memory trouble and to access data when needed from the shared memory structure directly. How can I iterate over the dataframe without creating a local variable? I can't really vectorize the itertuples loop, as it is sequential in nature (the next iteration relies on results from the previous).
Thank you for your time.
Update: I can currently get around this problem by reducing the number of processes (16 was a stress test) and setting maxtasksperchild on the Pool. I had tried that property before and saw CPU slowdowns (when starting up and memory was being copied now that I think about it), so I scrapped it. CPU seems normal now with it.
pool = mp.Pool(processes=NUM_PROCESSES, maxtasksperchild=1)
Similar behavior is also achieved by garbage collecting after the loop completes, but since that behavior is less desirable, I will stick to maxtasksperchild.
As the processes loop, I see a decent size memory drain, but then it is reclaimed by the system when the processes complete. Obviously, as my data set grows and my memory doesn't, this solution will eventually be invalid. So the question remains, is there a way to iterate over a DataFrame without copying memory?