How can Python Observe Changes to Mongodb's Op

2019-02-10 08:01发布

I have multiple Python scripts writing to Mongodb using pyMongo. How can another Python script observe changes to a Mongo query and perform some function when the change occurs? mongodb is setup with oplog enabled.

4条回答
可以哭但决不认输i
2楼-- · 2019-02-10 08:28

Query the oplog with a tailable cursor.

It is actually funny, because oplog-monitoring is exactly what the tailable-cursor feature was added for originally. I find it extremely useful for other things as well (e.g. implementing a mongodb-based pubsub, see this post for example), but that was the original purpose.

查看更多
够拽才男人
3楼-- · 2019-02-10 08:43

I ran into this issue today and haven't found an updated answer anywhere.

The Cursor class has changed as of v3.0 and no longer accepts the tailable and await_data arguments. This example will tail the oplog and print the oplog record when it finds a record newer than the last one it found.

# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735
# to work with pymongo 3.0

import pymongo
from pymongo.cursor import CursorType

c = pymongo.MongoClient()

# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs

first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']

while True:
    cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
    while cursor.alive:
        for doc in cursor:
            ts = doc['ts']
            print doc
            # Work with doc here
查看更多
我只想做你的唯一
4楼-- · 2019-02-10 08:46

I wrote a incremental backup tool for MongoDB some time ago, in Python. The tool monitors data changes by tailing the oplog. Here is the relevant part of the code.

Updated answer, pymongo 3

from time import sleep

from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect

# Time to wait for data or connection.
_SLEEP = 1.0

if __name__ == '__main__':
    oplog = MongoClient().local.oplog.rs
    stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']

    while True:
        kw = {}

        kw['filter'] = {'ts': {'$gt': stamp}}
        kw['cursor_type'] = CursorType.TAILABLE_AWAIT
        kw['oplog_replay'] = True

        cursor = oplog.find(**kw)

        try:
            while cursor.alive:
                for doc in cursor:
                    stamp = doc['ts']

                    print(doc)  # Do something with doc.

                sleep(_SLEEP)

        except AutoReconnect:
            sleep(_SLEEP)

Also see http://api.mongodb.com/python/current/examples/tailable.html.

Original answer, pymongo 2

from time import sleep

from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp

# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}

# Time to wait for data or connection.
_SLEEP = 10

if __name__ == '__main__':
    db = MongoClient().local

    while True:
        query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}}  # Replace with your query.
        cursor = db.oplog.rs.find(query, **_TAIL_OPTS)

        cursor.add_option(_QUERY_OPTIONS['oplog_replay'])

        try:
            while cursor.alive:
                try:
                    doc = next(cursor)

                    # Do something with doc.

                except (AutoReconnect, StopIteration):
                    sleep(_SLEEP)

        finally:
            cursor.close()
查看更多
Animai°情兽
5楼-- · 2019-02-10 08:52

I had the same issue. I put together this rescommunes/oplog.py. Check comments and see __main__ for an example of how you could use it with your script.

查看更多
登录 后发表回答