I'm trying to process the contents of a tarfile using multiprocessing.Pool
. I'm able to successfully use the ThreadPool implementation within the multiprocessing module, but would like to be able to use processes instead of threads as it would possibly be faster and eliminate some changes made for Matplotlib to handle the multithreaded environment. I'm getting an error that I suspect is related to processes not sharing address space, but I'm not sure how to fix it:
Traceback (most recent call last):
File "test_tarfile.py", line 32, in <module>
test_multiproc()
File "test_tarfile.py", line 24, in test_multiproc
pool.map(read_file, files)
File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 225, in map
return self.map_async(func, iterable, chunksize).get()
File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
ValueError: I/O operation on closed file
The actual program is more complicated, but this is an example of what I'm doing that reproduces the error:
from multiprocessing.pool import ThreadPool, Pool
import StringIO
import tarfile
def write_tar():
tar = tarfile.open('test.tar', 'w')
contents = 'line1'
info = tarfile.TarInfo('file1.txt')
info.size = len(contents)
tar.addfile(info, StringIO.StringIO(contents))
tar.close()
def test_multithread():
tar = tarfile.open('test.tar')
files = [tar.extractfile(member) for member in tar.getmembers()]
pool = ThreadPool(processes=1)
pool.map(read_file, files)
tar.close()
def test_multiproc():
tar = tarfile.open('test.tar')
files = [tar.extractfile(member) for member in tar.getmembers()]
pool = Pool(processes=1)
pool.map(read_file, files)
tar.close()
def read_file(f):
print f.read()
write_tar()
test_multithread()
test_multiproc()
I suspect that the something's wrong when the TarInfo
object is passed into the other process but the parent TarFile
is not, but I'm not sure how to fix it in the multiprocess case. Can I do this without having to extract files from the tarball and write them to disk?
You're not passing a
TarInfo
object into the other process, you're passing the result oftar.extractfile(member)
into the other process wheremember
is aTarInfo
object. Theextractfile(...)
method returns a file-like object which has, among other things, aread()
method which operates upon the original tar file you opened withtar = tarfile.open('test.tar')
.However, you can't use an open file from one process in another process, you have to re-open the file. I replaced your
test_multiproc()
with this:And added this:
and was able to get your code working.