i am trying to setup syncing from mongodb to kudu with debezium mongodb connector. but as debezium doc tell and also i tried by myself and found, there are no filter(_id value) for debezium mongodb CDC update/$set message.
{
"after": null,
"patch": "{\"$v\" : 1,\"$set\" : {\"_upts_ratio_average_points\" : {\"$numberLong\" : \"1564645156749\"},\"updatets\" : {\"$numberLong\" : \"1564645156749\"}}}",
"source": {
"version": "0.9.5.Final",
"connector": "mongodb",
"name": "promongodbdeb05",
"rs": "mgset-13056897",
"ns": "strtest.mg_jsd_result_all",
"sec": 1564645156,
"ord": 855,
"h": -1570214265415439167,
"initsync": false
},
"op": "u",
"ts_ms": 1564648181536
}
I don't understand why designed like this, without filter really no idea which document is updated. I downloaded the source code of this connector and try to fix it. It looks like class io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
is where MongoDB op log message is extracted with code like these. And this file is with confusing both _id
and id
manipulations and it looks like the committer of the connector indeed tried to include _id
value in the CDC update message. I tried to change valueDocument.append("id", keyDocument.get("id"));
to valueDocument.append("id", keyDocument.get("_id"));
still no _id
value in CDC message after the connector is rebuilt and deployed.
Anyone familiar with debezium can help me with this?
{
private BsonDocument getUpdateDocument(R patchRecord, BsonDocument keyDocument) {
BsonDocument valueDocument = new BsonDocument();
BsonDocument document = BsonDocument.parse(patchRecord.value().toString());
if (document.containsKey("$set")) {
valueDocument = document.getDocument("$set");
}
if (document.containsKey("$unset")) {
Set<Entry<String, BsonValue>> unsetDocumentEntry = document.getDocument("$unset").entrySet();
for (Entry<String, BsonValue> valueEntry : unsetDocumentEntry) {
// In case unset of a key is false we don't have to do anything with it,
// if it's true we want to set the value to null
if (!valueEntry.getValue().asBoolean().getValue()) {
continue;
}
valueDocument.append(valueEntry.getKey(), new BsonNull());
}
}
if (!document.containsKey("$set") && !document.containsKey("$unset")) {
if (!document.containsKey("_id")) {
throw new ConnectException("Unable to process Mongo Operation, a '$set' or '$unset' is necessary " +
"for partial updates or '_id' is expected for full Document replaces.");
}
// In case of a full update we can use the whole Document as it is
// see https://docs.mongodb.com/manual/reference/method/db.collection.update/#replace-a-document-entirely
valueDocument = document;
valueDocument.remove("_id");
}
if (!valueDocument.containsKey("id")) {
valueDocument.append("id", keyDocument.get("id"));
}
if (flattenStruct) {
final BsonDocument newDocument = new BsonDocument();
valueDocument.forEach((fKey, fValue) -> newDocument.put(fKey.replace(".", delimiter), fValue));
valueDocument = newDocument;
}
return valueDocument;
}
}
@jiri, thanks a lot for your reply, the message i got always like this:
{ "after": null, "patch": "{\"$v\" : 1,\"$set\" : {\"_upts_ratio_average_points\" : {\"$numberLong\" : \"1564645156749\"},\"updatets\" : {\"$numberLong\" : \"1564645156749\"}}}", "source": { "version": "0.9.5.Final", "connector": "mongodb", "name": "promongodbdeb05", "rs": "mgset-13056897", "ns": "strtest.mg_jsd_result_all", "sec": 1564645156, "ord": 855, "h": -1570214265415439167, "initsync": false }, "op": "u", "ts_ms": 1564648181536 }
and i searched and found someone else can get debezium mongodb CDC like this article: https://rmoff.net/2018/03/27/streaming-data-from-mongodb-into-kafka-with-kafka-connect-and-debezium/ like this: { "after": {\"_id\" : {\"$oid\" : \"58385328e4b001431e4e497a\"}, ....
One can see I can't get _id
, so no way for me know this change on which document/record, but as to the above post, it looks like the author can get _id
, also by checking code, _id
should be there. I used both 0.9.5Final
and the 0.7.4 rev
which used in the above post. both no luck for me, always without _id value.
when consumer the topic,need to add
--property key.print=true
.It will have the Key.The value is in the key part.Thanks