I'm processing very large amounts of data, stored in a dictionary, using multiprocessing. Basically all I'm doing is loading some signatures, stored in a dictionary, building a shared dict object out of it (getting the 'proxy' object returned by Manager.dict() ) and passing this proxy as argument to the function that has to be executed in multiprocessing.
Just to clarify:
signatures = dict()
load_signatures(signatures)
[...]
manager = Manager()
signaturesProxy = manager.dict(signatures)
[...]
result = pool.map ( myfunction , [ signaturesProxy ]*NUM_CORES )
Now, everything works perfectly if signatures is less than 2 million entries or so. Anyways, I have to process a dictionary with 5.8M keys (pickling signatures in binary format generates a 4.8 GB file). In this case, the process dies during the creation of the proxy object:
Traceback (most recent call last):
File "matrix.py", line 617, in <module>
signaturesProxy = manager.dict(signatures)
File "/usr/lib/python2.6/multiprocessing/managers.py", line 634, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/lib/python2.6/multiprocessing/managers.py", line 534, in _create
id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
File "/usr/lib/python2.6/multiprocessing/managers.py", line 79, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/managers.py", line 173, in handle_request
request = c.recv()
EOFError
---------------------------------------------------------------------------
I know the data structure is huge but I'm working on a machine equipped w/ 32GB of RAM, and running top I see that the process, after loading the signatures, occupies 7GB of RAM. It then starts building the proxy object and the RAM usage goes up to ~17GB of RAM but never gets close to 32. At this point, the RAM usage starts diminishing quickly and the process terminates with the above error. So I guess this is not due to an out-of-memory error...
Any idea or suggestion?
Thank you,
Davide
If the dictionaries are read-only, you don't need proxy objects in most operating systems.
Just load the dictionaries before starting the workers, and put them somewhere they'll be reachable; the simplest place is globally to a module. They'll be readable from the workers.
from multiprocessing import Pool
buf = ""
def f(x):
buf.find("x")
return 0
if __name__ == '__main__':
buf = "a" * 1024 * 1024 * 1024
pool = Pool(processes=1)
result = pool.apply_async(f, [10])
print result.get(timeout=5)
This only uses 1GB of memory combined, not 1GB for each process, because any modern OS will make a copy-on-write shadow of the data created before the fork. Just remember that changes to the data won't be seen by other workers, and memory will, of course, be allocated for any data you change.
It will use some memory: the page of each object containing the reference count will be modified, so it'll be allocated. Whether this matters depends on the data.
This will work on any OS that implements ordinary forking. It won't work on Windows; its (crippled) process model requires relaunching the entire process for each worker, so it's not very good at sharing data.
Why don't you try this with a database? Databases are not limited to adressable/physical ram and are safe for multithread/process use.
In the interest of saving time and not having to debug system-level issues, maybe you could split your 5.8 million record dictionary into three sets of ~2 million each, and run the job 3 times.
I think the problem you were encountering was the dict or hash table resizing itself as it grows. Initially, the dict has a set number of buckets available. I'm not sure about Python, but I know Perl starts with 8 and then when the buckets are full, the hash is recreated by 8 more (ie. 8, 16, 32, ...).
The bucket is a landing location for the hash algorithm. The 8 slots do not mean 8 entries, it means 8 memory locations. When the new item is added, a hash is generated for that key, then its stored into that bucket.
This is where collisions come into play. The more items that are in a bucket, the slower the function will get, because items are appended sequentially due to dynamic sizing of the slot.
One problem that may occur is your keys are very similar and producing the same hash result - meaning a majority of keys are in one slot. Pre-allocating the hash buckets will help eliminate this and actually improve processing time and key management, plus it no longer needs to do all that swapping.
However, I think you are still limited to the amount of free contiguous memory and will eventually need to go to the database solution.
side note: I'm still new to Python, I know in Perl you can see hash stats by doing print %HASHNAME, it will show your distribution of bucket usage. Helps you identify collisions counts - incase you need to pre-allocate buckets. Can this be done in Python as well?
Rich