Supposing I have a very big text file consisting of many lines that I would like to reverse. And I don't care of the final order. The input file contains Cyrillic symbols. I use multiprocessing
to process on several cores.
I wrote such program:
# task.py
import multiprocessing as mp
POOL_NUMBER = 2
lock_read = mp.Lock()
lock_write = mp.Lock()
fi = open('input.txt', 'r')
fo = open('output.txt', 'w')
def handle(line):
# In the future I want to do
# some more complicated operations over the line
return line.strip()[::-1] # Reversing
def target():
while True:
try:
with lock_read:
line = next(fi)
except StopIteration:
break
line = handle(line)
with lock_write:
print(line, file=fo)
pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
p.start()
for p in pool:
p.join()
fi.close()
fo.close()
This program fails with error:
Process Process-2:
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "task.py", line 22, in target
line = next(fi)
File "/usr/lib/python3.5/codecs.py", line 321, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte
On the other hand, everything works fine if I set POOL_NUMBER = 1
. But it doesn't make a sense if I want to gain the total performance.
Why does that error happen? And how can I fix it?
I use Python 3.5.2
.
I generated data using this script:
# gen_file.py
from random import randint
LENGTH = 100
SIZE = 100000
def gen_word(length):
return ''.join(
chr(randint(ord('а'), ord('я')))
for _ in range(length)
)
if __name__ == "__main__":
with open('input.txt', 'w') as f:
for _ in range(SIZE):
print(gen_word(LENGTH), file=f)
The issue here is reading a file from multi processes isn't working as you think, you can't share the open
object between processes.
You could make a global current_line
variable, and each time read the file and process the current line, not ideal.
Here is a different approach, using processes pool, and map
method, I'm iterating over the file, and for each line I enqueue your target method:
from multiprocessing import Lock
from multiprocessing import Pool
import time
import os
POOL_NUMBER = 8
def target(line):
# Really need some processing here
for _ in range(2**10):
pass
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
t0 = time.time()
processed_lines = pool.map(target, fi.readlines())
print('Total time', time.time() - t0)
with open('output.txt', 'w') as fo:
for processed_line in processed_lines:
fo.writelines(processed_line)
With 8 process on my machine:
Total time 1.3367934226989746
And with 1 process:
Total time 4.324501991271973
This works best if your target function is CPU bound, a different approach would be to split the file into POOL_NUMBER
chunks and make each process write a processed chunk of data(with lock!) to the output file.
Another approach, is to create a master process that does the write job for the rest of the processes, here is an example.
EDIT
After you comment i figured you can't fit the file into memory.
For this, you can just iterate over the file object which will read line by line into memory. But than we need to modify the code a little big:
POOL_NUMBER = 8
CHUNK_SIZE = 50000
def target(line):
# This is not a measurable task, since most of the time wil spent on writing the data
# if you have a CPU bound task, this code will make sense
return line[::-1]
pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0) # Just to make sure we have plan new file
processed_lines = []
with open('input.txt', 'r') as fi:
t0 = time.time()
for line in fi:
processed_lines.append(pool.apply_async(target, (line,))) # Keep a refernce to this task, but don't
if len(processed_lines) == CHUNK_SIZE:
with open('output.txt', 'w') as fo: # reading the file line by line
for processed_line in processed_lines:
fo.writelines(processed_line.get())
processed_lines = [] # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
print('Total time', time.time() - t0)
Keep in mind that you can play with the CHUNK_SIZE
variable to control how much memory you use. For me 5000 is about 10K max for each process.
P.S
I think it would be best the split the big file into smaller files, this way you solve the read/write lock on the file, and also make it scalable to process(even on a different machine!)
It looks like line = next(fi)
is not processed correctly under different Process
.
It's possible to bypass the need of using next(fi)
with the help of temporary buffer of lines filled by the main thread of program and read by each process. For this role it's better to use multiprocessing.Queue
.
So this is my script:
from time import sleep, time
import multiprocessing as mp
import queue
MAX_QUEUE_SIZE = 1000
QUEUE_TIMEOUT = 0.000001
POOL_NUMBER = 4
def handle(line):
sleep(0.00001) # Some processing here that takes time
return line.strip()[::-1]
def target(fout, write_lock, lines_queue):
while True:
try:
line = lines_queue.get(timeout=1.0)
line = handle(line)
with write_lock:
print(line, file=fout)
fout.flush()
except queue.Empty:
break
if __name__ == "__main__":
time_begin = time()
with open('output.txt', 'w') as fout:
write_lock = mp.Lock()
lines_queue = mp.Queue()
processes = [
mp.Process(target=target, args=(fout, write_lock, lines_queue))
for _ in range(POOL_NUMBER)
]
for p in processes:
p.start()
with open('input.txt', 'r') as fin:
while True:
try:
while lines_queue.qsize() < MAX_QUEUE_SIZE:
line = next(fin)
lines_queue.put(line)
sleep(QUEUE_TIMEOUT)
except StopIteration:
break
for p in processes:
p.join()
time_end = time()
print("Time:", time_end - time_begin)
On my CPU I got this result:
POOL_NUMBER = 1 -> Time: 17.877086400985718
POOL_NUMBER = 2 -> Time: 8.611438989639282
POOL_NUMBER = 3 -> Time: 6.332395553588867
POOL_NUMBER = 4 -> Time: 5.321753978729248