read multiple files using multiprocessing

2019-01-19 05:53发布

I need to read some very huge text files (100+ Mb), process every lines with regex and store the data into a structure. My structure inherits from defaultdict, it has a read(self) method that read self.file_name file.

Look at this very simple (but not real) example, I'm not using regex, but I'm splitting lines:


import multiprocessing
from collections import defaultdict

def SingleContainer():
    return list()

class Container(defaultdict):
    """
    this class store odd line in self["odd"] and even line in self["even"].
    It is stupid, but it's only an example. In the real case the class
    has additional methods that do computation on readen data.
    """
    def __init__(self,file_name):
        if type(file_name) != str:
            raise AttributeError, "%s is not a string" % file_name
        defaultdict.__init__(self,SingleContainer)
        self.file_name = file_name
        self.readen_lines = 0
    def read(self):
        f = open(self.file_name)
        print "start reading file %s" % self.file_name
        for line in f:
            self.readen_lines += 1
            values = line.split()
            key = {0: "even", 1: "odd"}[self.readen_lines %2]
            self[key].append(values)
        print "readen %d lines from file %s" % (self.readen_lines, self.file_name)

def do(file_name):
    container = Container(file_name)
    container.read()
    return container.items()

if __name__ == "__main__":
    file_names = ["r1_200909.log", "r1_200910.log"]
    pool = multiprocessing.Pool(len(file_names))
    result = pool.map(do,file_names)
    pool.close()
    pool.join()
    print "Finish"      

At the end I need to join every results in a single Container. It is important that the order of the lines is preserved. My approach is too slow when returning values. Better solution? I'm using python 2.6 on Linux

3条回答
闹够了就滚
2楼-- · 2019-01-19 06:12

Multiprocessing is more suited to CPU- or memory-oriented processes since the seek time of rotational drives kills performance when switching between files. Either load your log files into a fast flash drive or some sort of memory disk (physical or virtual), or give up on multiprocessing.

查看更多
走好不送
3楼-- · 2019-01-19 06:22

You're creating a pool with as many workers as files. That may be too many. Usually, I aim to have the number of workers around the same as the number of cores.

The simple fact is that your final step is going to be a single process merging all the results together. There is no avoiding this, given your problem description. This is known as a barrier synchronization: all tasks have to reach the same point before any can proceed.

You should probably run this program multiple times, or in a loop, passing a different value to multiprocessing.Pool() each time, starting at 1 and going to the number of cores. Time each run, and see which worker count does best.

The result will depend on how CPU-intensive (as opposed to disk-intensive) your task is. I would not be surprised if 2 were best if your task is about half CPU and half disk, even on an 8-core machine.

查看更多
走好不送
4楼-- · 2019-01-19 06:38

You're probably hitting two problems.

One of them was mentioned: you're reading multiple files at once. Those reads will end up being interleaved, causing disk thrashing. You want to read whole files at once, and then only multithread the computation on the data.

Second, you're hitting the overhead of Python's multiprocessing module. It's not actually using threads, but instead starting multiple processes and serializing the results through a pipe. That's very slow for bulk data--in fact, it seems to be slower than the work you're doing in the thread (at least in the example). This is the real-world problem caused by the GIL.

If I modify do() to return None instead of container.items() to disable the extra data copy, this example is faster than a single thread, as long as the files are already cached:

Two threads: 0.36elapsed 168%CPU

One thread (replace pool.map with map): 0:00.52elapsed 98%CPU

Unfortunately, the GIL problem is fundamental and can't be worked around from inside Python.

查看更多
登录 后发表回答