可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
Given ten 1MB csv files, each with slightly different layouts, I need to combine them into a normalized single file with the same header. Empty string is fine for nulls.
Examples of columns:
1. FIELD1, FIELD2, FIELD3
2. FIELD2, FIELD1, FIELD3
3. FIELD1, FIELD3, FIELD4
4. FIELD3, FIELD4, FIELD5, FIELD6
5. FIELD2
The output would look like (although order not important, my code puts them in order discovered):
FIELD1, FIELD2, FIELD3, FIELD4, FIELD5, FIELD6
So basically the fields can come in any order, fields may be missing, or new fields not seen before. All must be included in the output file. No joining required, in the end the count of data rows in the parts must equal the count of rows in the output.
Reading all 10MB into memory is OK. Somehow using 100MB to do it would not be. You can open all files at once if needed as well. Lots of file hands, memory available, but it will be running against a NAS so it needs to be efficient for that (not too many NAS ops).
The method I have right now is to read each file into columns lists, build new columns lists as I discover new columns then write it all out to a single file. I'm hoping someone has something a bit more clever, though, as I'm bottlenecking on this process so any relief is helpful.
I have samples files here if anyone wants to try. I'll post my current code as a possible answer. Looking for the fastest time when I run it on my server (lots of cores, lots of memory) using local disk.
回答1:
Use a two-pass approach with csv.DictReader()
and csv.DictWriter()
objects. Pass one collects the set of headers used across all the files, and pass two then copies across data based on the headers.
Collecting headers is as simple as just accessing the fieldnames
attribute on the reader objects is enough:
import csv
import glob
files = []
readers = []
fields = set()
try:
for filename in glob.glob('in*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
files.append(fileobj) # for later closing
reader = csv.DictReader(fileobj)
fields.update(reader.fieldnames) # reads the first row
readers.append(reader)
with open('result.csv', 'wb') as outf:
writer = csv.DictWriter(outf, fieldnames=sorted(fields))
writer.writeheader()
for reader in readers:
# copy across rows; missing fields will be left blank
for row in reader:
writer.writerow(row)
finally:
# close out open file objects
for fileobj in files:
fileobj.close()
Each reader produces a dictionary with a subset of all fields, but DictWriter
will use the value of the restval
argument (defaulting to ''
when omitted like I did here) to fill in the value of each missing key.
I assumed Python 2 here; if this is Python 3 you could use an ExitStack()
to manage the open files for the readers; omit the b
from the file modes and add a newline=''
argument to all open calls to leave newline handling to the CSV module.
The above code only ever uses a buffer to read and write rows; rows are mostly moved from one open reader to the writer one row at a time at a time.
Unfortunately, we cannot use writer.writerows(reader)
as the DictWriter.writerows()
implementation first converts everything in reader
to a list of lists before passing it on to the underlying csv.writer.writerows()
method, see issue 23495 in the Python bug tracker.
回答2:
Using the pandas library and the concat
function
import pandas
import glob
df = pandas.concat([pandas.read_csv(x) for x in glob.glob("in*.csv")])
df.to_csv("output.csv")
回答3:
Here's a simple solution using standard library modules. This is Python 3. Use the alternate commented with
lines for Python 2:
import csv
import glob
rows = []
fields = set()
for filename in glob.glob('in*.csv'):
#with open(filename,'rb') as f:
with open(filename,newline='') as f:
r = csv.DictReader(f)
rows.extend(row for row in r)
fields.update(r.fieldnames)
#with open('result.csv','wb') as f:
with open('result.csv','w',newline='') as f:
w = csv.DictWriter(f,fieldnames=fields)
w.writeheader()
w.writerows(rows)
Edit
Per comment, adding file name and line number:
import csv
import glob
rows = []
fields = set(['filename','lineno'])
for filename in glob.glob('in*.csv'):
with open(filename,newline='') as f:
r = csv.DictReader(f)
for lineno,row in enumerate(r,1):
row.update({'filename':filename,'lineno':lineno})
rows.append(row)
fields.update(r.fieldnames)
with open('result.csv','w',newline='') as f:
w = csv.DictWriter(f,fieldnames=fields)
w.writeheader()
w.writerows(rows)
Original on my system took 8.8s. This update took 10.6s.
Also note that if you order fields
before passing to DictWriter
you can put the columns in the order you want.
回答4:
It's not super short or anything, but basically I'm reading these into column stores then writing them all out. I'm hoping for something faster, or same speed, same i/o and less memory is good too... but faster is most important.
import csv
from os.path import join
from collections import OrderedDict
# Accumulators
#columnstore = OrderedDict of tuples ( Data List, Starting rowcount)
columnstore = OrderedDict()
total_rowcount = 0
def flush_to_merged_csv(merged_filename,delimiter):
with open(merged_filename,'w') as f:
writer = csv.writer(f, delimiter=bytes(delimiter) )
# Write the header first for all columns
writer.writerow(columnstore.keys())
# Write each row
for rowidx in range(0,total_rowcount):
# Assemble row from columnstore
row = []
for col in columnstore.keys():
if columnstore[col][1] <= rowidx:
row.append(columnstore[col][0][rowidx - columnstore[col][1]])
else:
row.append('')
writer.writerow(row)
def combine(location, files, mergefile, delimiter):
global total_rowcount
for filename in files:
with open(join(location,filename),'rb') as f:
file_rowcount = 0
reader = csv.reader( f, delimiter=bytes(delimiter) )
# Get the column names.
# Normalize the names (all upper, strip)
columns = [ x.strip().upper() for x in reader.next() ]
# Columnstore maintenance. Add new columns to columnstore
for col in columns:
if not columnstore.has_key(col):
columnstore[col] = ( [], total_rowcount )
# Loop throught the remaining file, adding each cell to the proper columnstore
for row in reader:
field_count = len(row)
total_rowcount += 1
# Add the columns that exist to the columnstore.
for columnidx in range(0,len(columns)):
# Handle missing trailing fields as empty
if columnidx >= field_count:
columnstore[columns[columnidx]][0].append('')
else:
columnstore[columns[columnidx]][0].append(row[columnidx])
# Add emptry strings to any columnstores that don't exist in this file to keep them all in sync
for colname in set(columnstore.keys()) - set(columns):
columnstore[colname][0].append('')
flush_to_merged_csv(join(location,mergefile),delimiter)
combine( './', ['in1.csv','in2.csv','in3.csv','in4.csv','in5.csv','in6.csv','in7.csv','in8.csv','in9.csv','in10.csv'],'output.csv',',')
回答5:
@MartijnPieter's answer is very helpful, but due to keeping the files open after reading the headers to re-use when reading the content, it crashes at ~255 files (I found). I needed to combine ~32,000 files, so rewrote his code slightly to not crash. I also chose to split it into two functions, so that I could analyse the column headers, in between.
def collectColumnNamesInCSVs():
fields = set()
for filename in glob.glob('//path//to//files/*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
reader = csv.DictReader(fileobj)
fields.update(reader.fieldnames) # reads the first row
fileobj.close()
return fields
def combineCSVs(fields):
with open('result.csv', 'wb') as outf:
writer = csv.DictWriter(outf, fieldnames=sorted(fields))
writer.writeheader()
for filename in glob.glob('//path//to//files/*.csv'):
try:
fileobj = open(filename, 'rb')
except IOError:
print "Failed to open {}".format(filename)
continue
reader = csv.DictReader(fileobj)
for row in reader:
writer.writerow(row)
fileobj.close()
outf.close()
When opening a very motley assortment of CSVs (<1k - 700k; 20-60 mixed columns each; ~130 headers in the total set) the second stage is taking ~1 minute per 1000 files on a 1.4GHz MacBook Air. Not bad, and several orders of magnitude faster than Pandas.