Read, format, then write large CSV files

2019-06-11 09:57发布

问题:

I have fairly large csv files that I need to manipulate/amend line-by-line (as each line may require different amending rules) then write them out to another csv with the proper formatting.

Currently, I have:

import multiprocessing

def read(buffer):
    pool = multiprocessing.Pool(4)
    with open("/path/to/file.csv", 'r') as f:
        while True:
            lines = pool.map(format_data, f.readlines(buffer))
            if not lines:
                break
            yield lines

def format_data(row):
    row = row.split(',') # Because readlines() returns a string
    # Do formatting via list comprehension
    return row

def main():
    buf = 65535
    rows = read(buf)
    with open("/path/to/new.csv",'w') as out:
        writer = csv.writer(f, lineterminator='\n')
        while rows:
            try:
                writer.writerows(next(rows))
            except StopIteration:
                break

Even though I'm using multiprocessing via map and preventing memory overload with a generator, it still takes me well over 2 min to process 40,000 lines. It honestly shouldn't take that much. I've even generated a nested list from the generator outputs and trying to write the data as one large file at one time, vice a chunk-by-chunk method and still it takes as long. What am I doing wrong here?

回答1:

I have figured it out.

First, the issue was in my format_data() function. It was making a call to a database connection that, every time it ran, it constructed the database connection and closed it with each iteration.

I fixed it by creating a basic mapping via a dictionary for an exponentially faster lookup table that supports multithreading.

So, my code looks like this:

import multiprocessing

def read(buffer):
    pool = multiprocessing.Pool(4)
    with open("/path/to/file.csv", 'r') as f:
        while True:
            lines = pool.map(format_data, f.readlines(buffer))
            if not lines:
                break
            yield lines

def format_data(row):
    row = row.split(',') # Because readlines() returns a string
    # Do formatting via list comprehension AND a dictionary lookup
    # vice a database connection
    return row

def main():
    rows = read(1024*1024)
    with open("/path/to/new.csv",'w') as out:
        while rows:
            try:
                csv.writer(f, lineterminator='\n').writerows(next(rows))
            except StopIteration:
                break

I was able to parse a ~150MB file in less than 30 sec. Some lessons learned here for others to hopefully learn from.