I have a large CSV file that I would like to split into a number that is equal to the number of CPU cores in the system. I want to then use multiprocess to have all the cores work on the file together. However, I am having trouble even splitting the file into parts. I've looked all over google and I found some sample code that appears to do what I want. Here is what I have so far:
def split(infilename, num_cpus=multiprocessing.cpu_count()):
READ_BUFFER = 2**13
total_file_size = os.path.getsize(infilename)
print total_file_size
files = list()
with open(infilename, 'rb') as infile:
for i in xrange(num_cpus):
files.append(tempfile.TemporaryFile())
this_file_size = 0
while this_file_size < 1.0 * total_file_size / num_cpus:
files[-1].write(infile.read(READ_BUFFER))
this_file_size += READ_BUFFER
files[-1].write(infile.readline()) # get the possible remainder
files[-1].seek(0, 0)
return files
files = split("sample_simple.csv")
print len(files)
for ifile in files:
reader = csv.reader(ifile)
for row in reader:
print row
The two prints show the correct file size and that it was split into 4 pieces (my system has 4 CPU cores).
However, the last section of the code that prints all the rows in each of the pieces gives the error:
for row in reader:
_csv.Error: line contains NULL byte
I tried printing the rows without running the split function and it prints all the values correctly. I suspect the split function has added some NULL bytes to the resulting 4 file pieces but I'm not sure why.
Does anyone know if this a correct and fast method to split the file? I just want resulting pieces that can be read successfully by csv.reader.
As I said in a comment, csv files would need to be split on row (or line) boundaries. Your code doesn't do this and potentially breaks them up somewhere in the middle of one — which I suspect is the cause of your
_csv.Error
.The following avoids doing that by processing the input file as a series of lines. I've tested it and it seems to work standalone in the sense that it divided the sample file up into approximately equally size chunks because it's unlikely that an whole number of rows will fit exactly into a chunk.
Update
This it is a substantially faster version of the code than I originally posted. The improvement is because it now uses the temp file's own
tell()
method to determine the constantly changing length of the file as it's being written instead of callingos.path.getsize()
, which eliminated the need toflush()
the file and callos.fsync()
on it after each row is written.