PyMongo supports generators for batch processing with sDB.insert(iter_something(converted))
. Bulk write operation features which executes write operations in batches in order to reduces the number of network round trips and increases write throughput.
The following code seems to work, but I do not whether PyMongo still is able iterate the generator together with multiprocessing until it has yielded 1000 a documents or 16MB of data, then pause the generator while it inserts the batch into MongoDB.
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
from itertools import groupby
from pymongo import MongoClient
from multiprocessing import Process, JoinableQueue
import csv
# > use test
# switched to db test
# > db.createCollection("abc")
# { "ok" : 1 }
# > db.abc.find()
parts = [["Test", "A", "B01", 828288, 1, 7, 'C', 5],
["Test", "A", "B01", 828288, 1, 7, 'T', 6],
["Test", "A", "B01", 171878, 3, 7, 'C', 5],
["Test", "A", "B01", 171878, 3, 7, 'T', 6],
["Test", "A", "B01", 871963, 3, 9, 'A', 5],
["Test", "A", "B01", 871963, 3, 9, 'G', 6],
["Test", "A", "B01", 1932523, 1, 10, 'T', 4],
["Test", "A", "B01", 1932523, 1, 10, 'A', 5],
["Test", "A", "B01", 1932523, 1, 10, 'X', 6],
["Test", "A", "B01", 667214, 1, 14, 'T', 4],
["Test", "A", "B01", 667214, 1, 14, 'G', 5],
["Test", "A", "B01", 667214, 1, 14, 'G', 6]]
def iter_something(rows):
key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type']
chr_key_names = ['letter', 'no']
for keys, group in groupby(rows, lambda row: row[:6]):
result = dict(zip(key_names, keys))
result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group]
yield result
class Loading(Process):
def __init__(self, task_queue):
Process.__init__(self)
self.task_queue = task_queue
db = MongoClient().test
self.sDB = db["abc"]
def run(self):
while True:
doc = self.task_queue.get()
if doc is None: # None means shutdown
self.task_queue.task_done()
break
else:
self.sDB.insert(doc)
def main():
num_cores = 2
tasks = JoinableQueue()
threads = [Loading(tasks) for i in range(num_cores)]
for i, w in enumerate(threads):
w.start()
print('Thread ' + str(i+1) + ' has started!')
converters = [str, str, str, int, int, int, str, int]
with open("/home/mic/tmp/test.txt") as f:
reader = csv.reader(f, skipinitialspace=True)
converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader)
# sDB.insert(iter_something(converted))
# Enqueue jobs
for i in iter_something(converted):
tasks.put(i)
# Add None to kill each thread
for i in range(num_cores):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
if __name__ == '__main__':
main()