I’d like to skip results that are returned from map_async
. They are growing in memory but I don’t need them.
Here is some code:
def processLine(line):
#process something
print "result"
pool = Pool(processes = 8)
for line in sys.stdin:
lines.append(line)
if len(lines) >= 100000:
pool.map_async(processLine, lines, 2000)
pool.close()
pool.join()
When I have to process file with hundreds of millions of rows, the python process grows in memory to a few gigabytes. How can I resolve that?
Thanks for your help :)
Your code has a bug:
for line in sys.stdin:
lines.append(line)
if len(lines) >= 100000:
pool.map_async(processLine, lines, 2000)
This is going to wait until lines
accumulates more than 100000 lines. After that, pool.map_async
is being called on the entire list of 100000+ lines for each additional line.
It is not clear exactly what you are really trying to do, but
if you don't want the return value, use pool.apply_async
, not pool.map_async
. Maybe something like this:
import multiprocessing as mp
def processLine(line):
#process something
print "result"
if __name__ == '__main__':
pool = mp.Pool(processes = 8)
for line in sys.stdin:
pool.apply_async(processLine, args = (line, ))
pool.close()
pool.join()
Yes you're right. There is some bug
I mean:
def processLine(line):
#process something
print "result"
pool = Pool(processes = 8)
if __name__ == '__main__':
for line in sys.stdin:
lines.append(line)
if len(lines) >= 100000:
pool.map_async(processLine, lines, 2000)
lines = [] #to clear buffer
pool.map_async(processLine, lines, 2000)
pool.close()
pool.join()
I used map_async because it has configurable chunk_size so it is more efficient if there are lots of lines which processing time is quite short.