I have some code for performing some operations using the pathos extension of the multiprocessing library. My question is how to employ a more complex worker function - in this case named New_PP
. How should I format the thpool line to handle a dictionary that my worker function requires in order to give me a result. Python defaults dictionaries to global variables, but within the scope of the worker function I get an error related to this dictionary (access_dict
) not being found, so how can I send in the dictionary or ensure it is available to my worker thread.
Nchunks = 10
thpool = pathos.multiprocessing.ThreadingPool()
mppool = pathos.multiprocessing.ProcessingPool()
Lchunk = int(len(readinfiles) / Nchunks)
filechunks = chunks(readinfiles, 10)
for fnames in filechunks:
files = (open(name, 'r') for name in fnames)
res = thpool.map(mppool.map, [New_PP]*len(fnames), files)
print res[0][0]
And the worker function:
def New_PP(line):
split_line = line.rstrip()
if len(split_line) > 1:
access_dict[4] ....
How can the worker function get at access_dict
?
I have also tried to wrap up my function inside a class as follows:
class MAPPP:
def New_PP(self, line):
self.mytype = access_dict
return my_type
def __init__(self, value_dict):
self.access_dict = access_dict
and:
mapp = MAPPP(value_dict)
print mapp.value_dict
res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
However I get the same issue.
There are a few issues going on here:
your code above has a bunch of errors/typos.
when you send off mapp.New_PP
, it makes a copy of mapp.New_PP
… so it does not share access_dict
between instances because those instances are created and destroyed in a different interpreter session on a different processor.
Maybe the following will demonstrate a bit more clearly...
>>> class MAPPP(object):
... access_dict = {}
... def __init__(self, value_dict):
... MAPPP.access_dict.update(value_dict)
... return
... def New_PP(self, line):
... MAPPP.access_dict[line] = len(line)
... return len(line)
...
>>>
>>> mapp = MAPPP({})
>>> mapp.access_dict
{}
>>> import pathos
>>> thpool = pathos.multiprocessing.ThreadingPool()
>>> mppool = pathos.multiprocessing.ProcessingPool()
>>> fnames = ['foo.txt', 'bar.txt']
>>> files = (open(name, 'r') for name in fnames)
>>> res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
>>> res
[[21, 21, 21, 21, 21, 21, 20, 21, 19], [17, 18, 17, 17, 50, 82, 98]]
>>> mapp.access_dict
{}
So what happened? The files were read, line by line… and the lengths of each line were computed… and returned to the main process. However, the write of the line and length were not added to the instance of mapp.access_dict
that belongs to mapp
in the main process… and that's because mapp
is not passed to the other threads and processors… it's copied. So, it did work… and the lines were added to the relevant copies of the class's dict… but then they were garbage collected when the process/thread did its job and passed the line numbers back through the map
then shut down.
There is no "super-easy" way to do this in pathos
or multiprocessing
right now. However, you can do it if you use multiprocessing
and ctypes
.
You might want to look at working with multiprocessing
with shared memory and/or proxies:
- How to synchronize a python dict with multiprocessing
- How can I share a class between processes?
- How to combine Pool.map with Array (shared memory) in Python multiprocessing?
As pathos
author, I plan to make the functionality to do the above more high-level… but don't have a timeline for that at the moment.