How to set MongoDB Change Stream 'OperationTyp

2020-07-11 05:20发布

问题:

When running the new MongDB Server, version 3.6, and trying to add a Change Stream watch to a collection to get notifications of new inserts and updates of documents, I only receive notifications for updates, not for inserts.

This is the default way I have tried to add the watch:

IMongoDatabase mongoDatabase = mongoClient.GetDatabase("Sandbox");
IMongoCollection<BsonDocument> collection = mongoDatabase.GetCollection<BsonDocument>("TestCollection");
var changeStream = collection.Watch().ToEnumerable().GetEnumerator();
changeStream.MoveNext();
var next = changeStream.Current;

Then I downloaded the C# source code from MongoDB to see how they did this. Looking at their test code for change stream watches, they create a new document(Insert) and then change that document right away(Update) and THEN set up the Change Stream watch to receive an 'update' notification. No example is given on how to watch for 'insert' notifications.

I have looked at the Java and NodeJS examples, both on MongoDB website and SO, which seems to be straight forward and defines a way to see both Inserts and Updates:

var changeStream = collection.watch({ '$match': { $or: [ { 'operationType': 'insert' }, { 'operationType': 'update' } ] } });

The API for the C# driver is vastly different, I would have assumed they would have kept the same API for C# as Java and NodeJS. I found no or very few examples for C# to do the same thing.

The closest I have come is with the following attempt but still fails and the documentation for the C# version is very limited (or I have not found the right location). Setup is as follows:

String json = "{ '$match': { 'operationType': { '$in': ['insert', 'update'] } } }";
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

PipelineDefinition<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>> pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(Builders<ChangeStreamDocument<BsonDocument>>.Filter.Text(json,"json"));

Then running the statement below throws an Exception:

{"Command aggregate failed: $match with $text is only allowed as the first pipeline stage."}

No other Filter options has worked either, and I have not found a way to just enter the JSON as a string to set the 'operationType'.

var changeStream = collection.Watch(pipeline, options).ToEnumerable().GetEnumerator();
changeStream.MoveNext();
var next = changeStream.Current;

My only goal here is to be able to set the 'operationType' using the C# driver. Does anyone know what I am doing wrong or have tried this using the C# driver and had success?

After reading though a large number of webpages, with very little info on the C# version of the MongoDB driver, I am very stuck! Any help would be much appreciated.

回答1:

Here is a sample of code I've used to update the collection Watch to retrieve "events" other than just document updates.

IMongoDatabase sandboxDB = mongoClient.GetDatabase("Sandbox");
IMongoCollection<BsonDocument> collection = sandboxDB.GetCollection<BsonDocument>("TestCollection");

//Get the whole document instead of just the changed portion
ChangeStreamOptions options = new ChangeStreamOptions() { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

//The operationType can be one of the following: insert, update, replace, delete, invalidate
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }");

var changeStream = collection.Watch(pipeline, options).ToEnumerable().GetEnumerator();
changeStream.MoveNext();    //Blocks until a document is replaced, inserted or updated in the TestCollection
ChangeStreamDocument<BsonDocument> next = changeStream.Current;
enumerator.Dispose();

The EmptyPiplineDefinition...Match() argument could also be:

"{ $or: [ {operationType: 'replace' }, { operationType: 'insert' }, { operationType: 'update' } ] }"

If you wanted to use the $or command, or

"{ operationType: /^[^d]/  }"

to throw a little regex in there. This last one is saying, I want all operationTypes unless they start with the letter 'd'.