I'm using the following code to split a CSV file into multiple chunks (sourced from here)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
However, it seems that the number of chunks always remains constant regardless of the number of chunks that I choose to use. For example, whether I choose to have 1 or 10 chunks, I always get this output when processing a sample file. Ideally, I'd like to chunk a file so that it is equitably distributed.
Note, the real file I am chunking is over 13 million rows long which is why I am processing it piece by piece. That is a must!
6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
Per the comments, we wish to have each process work on a 10000-row chunk. That's not too hard to to do; see the
iter/islice
recipe below. However, the problem with usingis that
pool.map
will attempt to put all the chunks in a task queue at once. If this requires more memory than is available then you get aMemoryError
. (Note:pool.imap
suffers from the same problem.)So instead, we need to call
pool.map
iteratively, on pieces of each chunk.Each
chunk
will consist of up tochunksize*num_procs
lines from the file. This is enough data to give all workers in the pool something to work on, but not too big as to cause a MemoryError -- providedchunksize
is not set too large.Each
chunk
is then broken into pieces, with each piece consisting of up tochunksize
rows from the file. These pieces are then sent topool.map
.How does
iter(lambda: list(IT.islice(iterator, chunksize)), [])
work:This is an idiom for grouping an iterator into chunks of length chunksize. Let's see how it works on an example:
Notice that each time
IT.islice(iterator, 3)
is called, a new chunk of 3 items is sliced off of the iterator:When there are fewer than 3 items left in the iterator, only what remains is returned:
And if you call it again, you get an empty list:
lambda: list(IT.islice(iterator, chunksize))
is a function which returnslist(IT.islice(iterator, chunksize))
when called. It is a "one-liner" which is equivalent toFinally,
iter(callable, sentinel)
returns another iterator. The values yielded by this iterator are the values returned by the callable. It keeps on yielding values until the callable returns a value equal to the sentinel. Sowill keep on returning the values
list(IT.islice(iterator, chunksize))
until that value is the empty list:First of all itertools.groupby will not make any real sense if the records are not already sorted on the key column. Moreover, if you requirement is just to chunk the csv file into a predetermined number of rows and give it to a worker , then you don’t have to do all these.
A simple implementation will be:
*Edit: changed to pool.imap instead of pool.map