PyMongo’s bulk write operation features with multi

2019-09-01 07:13发布

问题:

PyMongo supports generators for batch processing with sDB.insert(iter_something(converted)). Bulk write operation features which executes write operations in batches in order to reduces the number of network round trips and increases write throughput.

The following code seems to work, but I do not whether PyMongo still is able iterate the generator together with multiprocessing until it has yielded 1000 a documents or 16MB of data, then pause the generator while it inserts the batch into MongoDB.

#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
from itertools import groupby
from pymongo import MongoClient
from multiprocessing import Process, JoinableQueue
import csv

# > use test
# switched to db test
# > db.createCollection("abc")
# { "ok" : 1 }
# > db.abc.find()


parts = [["Test", "A", "B01", 828288,  1,    7, 'C', 5],
    ["Test", "A", "B01", 828288,  1,    7, 'T', 6],
    ["Test", "A", "B01", 171878,  3,    7, 'C', 5],
    ["Test", "A", "B01", 171878,  3,    7, 'T', 6],
    ["Test", "A", "B01", 871963,  3,    9, 'A', 5],
    ["Test", "A", "B01", 871963,  3,    9, 'G', 6],
    ["Test", "A", "B01", 1932523, 1,   10, 'T', 4],
    ["Test", "A", "B01", 1932523, 1,   10, 'A', 5],
    ["Test", "A", "B01", 1932523, 1,   10, 'X', 6],
    ["Test", "A", "B01", 667214,  1,   14, 'T', 4],
    ["Test", "A", "B01", 667214,  1,   14, 'G', 5],
    ["Test", "A", "B01", 667214,  1,   14, 'G', 6]]


def iter_something(rows):
    key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type']
    chr_key_names = ['letter', 'no']
    for keys, group in groupby(rows, lambda row: row[:6]):
        result = dict(zip(key_names, keys))
        result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group]
        yield result

class Loading(Process):

    def __init__(self, task_queue):
        Process.__init__(self)
        self.task_queue = task_queue
        db = MongoClient().test
        self.sDB = db["abc"]

    def run(self):
        while True:
            doc = self.task_queue.get()
            if doc is None:  # None means shutdown
                self.task_queue.task_done()
                break
            else:
                self.sDB.insert(doc)

def main():
    num_cores = 2

    tasks = JoinableQueue()

    threads = [Loading(tasks) for i in range(num_cores)]

    for i, w in enumerate(threads):
        w.start()
        print('Thread ' + str(i+1) + ' has started!')

    converters = [str, str, str, int, int, int, str, int]
    with open("/home/mic/tmp/test.txt") as f:
        reader = csv.reader(f, skipinitialspace=True)
        converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader)
        # sDB.insert(iter_something(converted))

        # Enqueue jobs
        for i in iter_something(converted):
            tasks.put(i)

    # Add None to kill each thread
    for i in range(num_cores):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()


if __name__ == '__main__':
    main()

回答1:

In this case you are not taking advantage of batch inserts. Each call to "self.sDB.insert(doc)" immediately sends the document to MongoDB and waits for the reply from the server. You could try this:

def run(self):
    def gen():
        while True:
            doc = self.task_queue.get()
            if doc is None:  # None means shutdown
                self.task_queue.task_done()
                break

            else:
                yield doc

    try:
        self.sDB.insert(gen())
    except InvalidOperation as e:
        # Perhaps "Empty bulk write", this process received no documents.
        print(e)

Use mongosniff to verify that you're sending large batches to the server instead of inserting one document at a time. Depending on the number of documents and the number of processes, some processes might get no documents. PyMongo throws InvalidOperation if you try to insert from an empty iterator, so I "insert" with a "try / except".

By the way, you don't need to call createCollection with MongoDB: the first insert into a collection creates it automatically. createCollection is only necessary if you want special options, like a capped collection.