Fastest & I/O efficient way to combine heterogeneo

2020-07-24 04:57发布

问题:

Given ten 1MB csv files, each with slightly different layouts, I need to combine them into a normalized single file with the same header. Empty string is fine for nulls.

Examples of columns:

1. FIELD1, FIELD2, FIELD3
2. FIELD2, FIELD1, FIELD3
3. FIELD1, FIELD3, FIELD4
4. FIELD3, FIELD4, FIELD5, FIELD6
5. FIELD2

The output would look like (although order not important, my code puts them in order discovered):

FIELD1, FIELD2, FIELD3, FIELD4, FIELD5, FIELD6

So basically the fields can come in any order, fields may be missing, or new fields not seen before. All must be included in the output file. No joining required, in the end the count of data rows in the parts must equal the count of rows in the output.

Reading all 10MB into memory is OK. Somehow using 100MB to do it would not be. You can open all files at once if needed as well. Lots of file hands, memory available, but it will be running against a NAS so it needs to be efficient for that (not too many NAS ops).

The method I have right now is to read each file into columns lists, build new columns lists as I discover new columns then write it all out to a single file. I'm hoping someone has something a bit more clever, though, as I'm bottlenecking on this process so any relief is helpful.

I have samples files here if anyone wants to try. I'll post my current code as a possible answer. Looking for the fastest time when I run it on my server (lots of cores, lots of memory) using local disk.

回答1:

Use a two-pass approach with csv.DictReader() and csv.DictWriter() objects. Pass one collects the set of headers used across all the files, and pass two then copies across data based on the headers.

Collecting headers is as simple as just accessing the fieldnames attribute on the reader objects is enough:

import csv
import glob

files = []
readers = []
fields = set()

try:
    for filename in glob.glob('in*.csv'):
        try:
            fileobj = open(filename, 'rb')
        except IOError:
            print "Failed to open {}".format(filename)
            continue
        files.append(fileobj)  # for later closing

        reader = csv.DictReader(fileobj)
        fields.update(reader.fieldnames)  # reads the first row
        readers.append(reader)

    with open('result.csv', 'wb') as outf:
        writer = csv.DictWriter(outf, fieldnames=sorted(fields))
        writer.writeheader()
        for reader in readers:
            # copy across rows; missing fields will be left blank
            for row in reader:
                writer.writerow(row)
finally:
    # close out open file objects
    for fileobj in files:
        fileobj.close()

Each reader produces a dictionary with a subset of all fields, but DictWriter will use the value of the restval argument (defaulting to '' when omitted like I did here) to fill in the value of each missing key.

I assumed Python 2 here; if this is Python 3 you could use an ExitStack() to manage the open files for the readers; omit the b from the file modes and add a newline='' argument to all open calls to leave newline handling to the CSV module.

The above code only ever uses a buffer to read and write rows; rows are mostly moved from one open reader to the writer one row at a time at a time.

Unfortunately, we cannot use writer.writerows(reader) as the DictWriter.writerows() implementation first converts everything in reader to a list of lists before passing it on to the underlying csv.writer.writerows() method, see issue 23495 in the Python bug tracker.



回答2:

Using the pandas library and the concat function

import pandas
import glob
df = pandas.concat([pandas.read_csv(x) for x in glob.glob("in*.csv")])
df.to_csv("output.csv")


回答3:

Here's a simple solution using standard library modules. This is Python 3. Use the alternate commented with lines for Python 2:

import csv
import glob

rows = []
fields = set()

for filename in glob.glob('in*.csv'):
    #with open(filename,'rb') as f:
    with open(filename,newline='') as f:
        r = csv.DictReader(f)
        rows.extend(row for row in r)
        fields.update(r.fieldnames)

#with open('result.csv','wb') as f:
with open('result.csv','w',newline='') as f:
    w = csv.DictWriter(f,fieldnames=fields)
    w.writeheader()
    w.writerows(rows)

Edit

Per comment, adding file name and line number:

import csv
import glob

rows = []
fields = set(['filename','lineno'])

for filename in glob.glob('in*.csv'):
    with open(filename,newline='') as f:
        r = csv.DictReader(f)
        for lineno,row in enumerate(r,1):
            row.update({'filename':filename,'lineno':lineno})
            rows.append(row)
        fields.update(r.fieldnames)

with open('result.csv','w',newline='') as f:
    w = csv.DictWriter(f,fieldnames=fields)
    w.writeheader()
    w.writerows(rows)

Original on my system took 8.8s. This update took 10.6s.

Also note that if you order fields before passing to DictWriter you can put the columns in the order you want.



回答4:

It's not super short or anything, but basically I'm reading these into column stores then writing them all out. I'm hoping for something faster, or same speed, same i/o and less memory is good too... but faster is most important.

import csv
from os.path import join
from collections import OrderedDict


# Accumulators
#columnstore = OrderedDict of tuples ( Data List, Starting rowcount)
columnstore = OrderedDict()
total_rowcount = 0

def flush_to_merged_csv(merged_filename,delimiter):

    with open(merged_filename,'w') as f:
        writer = csv.writer(f, delimiter=bytes(delimiter) )

        # Write the header first for all columns
        writer.writerow(columnstore.keys())

        # Write each row
        for rowidx in range(0,total_rowcount):

            # Assemble row from columnstore
            row = []
            for col in columnstore.keys():
                if columnstore[col][1] <= rowidx:
                    row.append(columnstore[col][0][rowidx - columnstore[col][1]])
                else:
                    row.append('')

            writer.writerow(row)


def combine(location, files, mergefile, delimiter):
    global total_rowcount

    for filename in files:

        with open(join(location,filename),'rb') as f:
            file_rowcount = 0
            reader = csv.reader( f, delimiter=bytes(delimiter) )

            # Get the column names.
            # Normalize the names (all upper, strip)
            columns = [ x.strip().upper() for x in reader.next() ]


            # Columnstore maintenance. Add new columns to columnstore
            for col in columns:
                if not columnstore.has_key(col):
                    columnstore[col] = ( [], total_rowcount )


            # Loop throught the remaining file, adding each cell to the proper columnstore
            for row in reader:
                field_count = len(row)
                total_rowcount += 1

                # Add the columns that exist to the columnstore.
                for columnidx in range(0,len(columns)):
                    # Handle missing trailing fields as empty
                    if columnidx >= field_count:
                        columnstore[columns[columnidx]][0].append('')
                    else:
                        columnstore[columns[columnidx]][0].append(row[columnidx])

                # Add emptry strings to any columnstores that don't exist in this file to keep them all in sync
                for colname in set(columnstore.keys()) - set(columns):
                    columnstore[colname][0].append('')

    flush_to_merged_csv(join(location,mergefile),delimiter)

combine( './', ['in1.csv','in2.csv','in3.csv','in4.csv','in5.csv','in6.csv','in7.csv','in8.csv','in9.csv','in10.csv'],'output.csv',',')


回答5:

@MartijnPieter's answer is very helpful, but due to keeping the files open after reading the headers to re-use when reading the content, it crashes at ~255 files (I found). I needed to combine ~32,000 files, so rewrote his code slightly to not crash. I also chose to split it into two functions, so that I could analyse the column headers, in between.

def collectColumnNamesInCSVs():
    fields = set()

    for filename in glob.glob('//path//to//files/*.csv'):
        try:
            fileobj = open(filename, 'rb')
        except IOError:
            print "Failed to open {}".format(filename)
            continue

        reader = csv.DictReader(fileobj)
        fields.update(reader.fieldnames)  # reads the first row
        fileobj.close()

    return fields


def combineCSVs(fields):
    with open('result.csv', 'wb') as outf:
        writer = csv.DictWriter(outf, fieldnames=sorted(fields))
        writer.writeheader()

        for filename in glob.glob('//path//to//files/*.csv'):
            try:
                fileobj = open(filename, 'rb')
            except IOError:
                print "Failed to open {}".format(filename)
                continue

            reader = csv.DictReader(fileobj)

            for row in reader:
                writer.writerow(row)

            fileobj.close()

    outf.close()

When opening a very motley assortment of CSVs (<1k - 700k; 20-60 mixed columns each; ~130 headers in the total set) the second stage is taking ~1 minute per 1000 files on a 1.4GHz MacBook Air. Not bad, and several orders of magnitude faster than Pandas.