Why does multiprocessing.Lock() not lock shared re

2020-08-15 12:04发布

问题:

Supposing I have a very big text file consisting of many lines that I would like to reverse. And I don't care of the final order. The input file contains Cyrillic symbols. I use multiprocessing to process on several cores.

I wrote such program:

# task.py

import multiprocessing as mp


POOL_NUMBER = 2


lock_read = mp.Lock()
lock_write = mp.Lock()

fi = open('input.txt', 'r')
fo = open('output.txt', 'w')

def handle(line):
    # In the future I want to do
    # some more complicated operations over the line
    return line.strip()[::-1]  # Reversing

def target():
    while True:
        try:
            with lock_read:
                line = next(fi)
        except StopIteration:
            break

        line = handle(line)

        with lock_write:
            print(line, file=fo)

pool = [mp.Process(target=target) for _ in range(POOL_NUMBER)]
for p in pool:
    p.start()
for p in pool:
    p.join()

fi.close()
fo.close()

This program fails with error:

Process Process-2:
Process Process-1:
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 0: invalid start byte
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "task.py", line 22, in target
    line = next(fi)
  File "/usr/lib/python3.5/codecs.py", line 321, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd1 in position 0: invalid continuation byte

On the other hand, everything works fine if I set POOL_NUMBER = 1. But it doesn't make a sense if I want to gain the total performance.

Why does that error happen? And how can I fix it?

I use Python 3.5.2.

I generated data using this script:

# gen_file.py

from random import randint


LENGTH = 100
SIZE = 100000


def gen_word(length):
    return ''.join(
        chr(randint(ord('а'), ord('я')))
        for _ in range(length)
    )


if __name__ == "__main__":
    with open('input.txt', 'w') as f:
        for _ in range(SIZE):
            print(gen_word(LENGTH), file=f)

回答1:

The issue here is reading a file from multi processes isn't working as you think, you can't share the open object between processes.

You could make a global current_line variable, and each time read the file and process the current line, not ideal.

Here is a different approach, using processes pool, and map method, I'm iterating over the file, and for each line I enqueue your target method:

from multiprocessing import Lock
from multiprocessing import Pool
import time
import os

POOL_NUMBER = 8

def target(line):
    # Really need some processing here
    for _ in range(2**10):
        pass
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
with open('input.txt', 'r') as fi:
    t0 = time.time()
    processed_lines = pool.map(target, fi.readlines())
    print('Total time', time.time() - t0)

    with open('output.txt', 'w') as fo:
        for processed_line in processed_lines:
            fo.writelines(processed_line)

With 8 process on my machine: Total time 1.3367934226989746

And with 1 process: Total time 4.324501991271973

This works best if your target function is CPU bound, a different approach would be to split the file into POOL_NUMBER chunks and make each process write a processed chunk of data(with lock!) to the output file.

Another approach, is to create a master process that does the write job for the rest of the processes, here is an example.

EDIT

After you comment i figured you can't fit the file into memory. For this, you can just iterate over the file object which will read line by line into memory. But than we need to modify the code a little big:

POOL_NUMBER = 8
CHUNK_SIZE = 50000

def target(line):
    # This is not a measurable task, since most of the time wil spent on writing the data
    # if you have a CPU bound task, this code will make sense
    return line[::-1]


pool = Pool(processes=POOL_NUMBER)
os.truncate('output.txt', 0)  # Just to make sure we have plan new file
processed_lines = []

with open('input.txt', 'r') as fi:
    t0 = time.time()
    for line in fi:
        processed_lines.append(pool.apply_async(target, (line,)))  # Keep a refernce to this task, but don't 

        if len(processed_lines) == CHUNK_SIZE:
            with open('output.txt', 'w') as fo:  # reading the file line by line
                for processed_line in processed_lines:
                    fo.writelines(processed_line.get())
            processed_lines = []  # truncate the result list, and let the garbage collector collect the unused memory, if we don't clear the list we will ran out of memory!
    print('Total time', time.time() - t0)

Keep in mind that you can play with the CHUNK_SIZE variable to control how much memory you use. For me 5000 is about 10K max for each process.

P.S

I think it would be best the split the big file into smaller files, this way you solve the read/write lock on the file, and also make it scalable to process(even on a different machine!)



回答2:

It looks like line = next(fi) is not processed correctly under different Process.

It's possible to bypass the need of using next(fi) with the help of temporary buffer of lines filled by the main thread of program and read by each process. For this role it's better to use multiprocessing.Queue.

So this is my script:

from time import sleep, time
import multiprocessing as mp
import queue


MAX_QUEUE_SIZE = 1000
QUEUE_TIMEOUT = 0.000001
POOL_NUMBER = 4


def handle(line):
    sleep(0.00001)  # Some processing here that takes time
    return line.strip()[::-1]


def target(fout, write_lock, lines_queue):
    while True:
        try:
            line = lines_queue.get(timeout=1.0)
            line = handle(line)
            with write_lock:
                print(line, file=fout)
                fout.flush()
        except queue.Empty:
            break


if __name__ == "__main__":
    time_begin = time()

    with open('output.txt', 'w') as fout:
        write_lock = mp.Lock()
        lines_queue = mp.Queue()

        processes = [
            mp.Process(target=target, args=(fout, write_lock, lines_queue))
            for _ in range(POOL_NUMBER)
        ]
        for p in processes:
            p.start()

        with open('input.txt', 'r') as fin:
            while True:
                try:
                    while lines_queue.qsize() < MAX_QUEUE_SIZE:
                        line = next(fin)
                        lines_queue.put(line)
                    sleep(QUEUE_TIMEOUT)
                except StopIteration:
                    break

        for p in processes:
            p.join()

    time_end = time()
    print("Time:", time_end - time_begin)

On my CPU I got this result:

POOL_NUMBER = 1 -> Time: 17.877086400985718
POOL_NUMBER = 2 -> Time: 8.611438989639282
POOL_NUMBER = 3 -> Time: 6.332395553588867
POOL_NUMBER = 4 -> Time: 5.321753978729248