From b4a87d45a7c3c7572d5b9471c2ec5a98ef9d748d Mon Sep 17 00:00:00 2001 From: Xinbin Huang Date: Mon, 19 Sep 2022 17:28:02 -0700 Subject: [PATCH] DBZ-5628 Added support for Mongo pre-image in change stream --- COPYRIGHT.txt | 1 + .../mongodb/MongoDbCollectionSchema.java | 25 ++++++++++++++++--- .../mongodb/MongoDbConnectorConfig.java | 25 ++++++++++++++++--- .../connector/mongodb/MongoDbSchema.java | 1 + .../MongoDbStreamingChangeEventSource.java | 4 +++ .../converters/MongoDbRecordParser.java | 2 +- jenkins-jobs/scripts/config/Aliases.txt | 1 + pom.xml | 2 +- 8 files changed, 52 insertions(+), 9 deletions(-) diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index b28e5214d..d04a91971 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -419,6 +419,7 @@ Willie Zhu Wout Scheepers Xiao Fu Xiaopu Zhu +Xinbin Huang Xinquan She Xuan Shen Yang Wu diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbCollectionSchema.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbCollectionSchema.java index 0c7baf5fa..5442062a9 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbCollectionSchema.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbCollectionSchema.java @@ -107,14 +107,17 @@ public Struct valueFromDocumentChangeStream(ChangeStreamDocument d Struct value = new Struct(valueSchema); switch (operation) { case CREATE: - final String jsonStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocument())); - value.put(FieldName.AFTER, jsonStr); + extractFullDocument(document, value); break; case UPDATE: + // Not null when full documents before change are enabled + if (document.getFullDocumentBeforeChange() != null) { + extractFullDocumentBeforeChange(document, value); + } + // Not null when full documents are enabled for updates if (document.getFullDocument() != null) { - final String fullDocStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocument())); - value.put(FieldName.AFTER, fullDocStr); + extractFullDocument(document, value); } if (document.getUpdateDescription() != null) { @@ -156,11 +159,25 @@ public Struct valueFromDocumentChangeStream(ChangeStreamDocument d } break; case DELETE: + // Not null when full documents before change are enabled + if (document.getFullDocumentBeforeChange() != null) { + extractFullDocumentBeforeChange(document, value); + } break; } return value; } + private void extractFullDocument(ChangeStreamDocument document, Struct value) { + final String fullDocStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocument())); + value.put(FieldName.AFTER, fullDocStr); + } + + private void extractFullDocumentBeforeChange(ChangeStreamDocument document, Struct value) { + final String fullDocBeforeChangeStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocumentBeforeChange())); + value.put(FieldName.BEFORE, fullDocBeforeChangeStr); + } + @Override public int hashCode() { return valueSchema().hashCode(); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 813b6f1fe..4721848bb 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -127,22 +127,37 @@ public static enum CaptureMode implements EnumeratedValue { /** * Change capture based on MongoDB Change Streams support. */ - CHANGE_STREAMS("change_streams", true, false), + CHANGE_STREAMS("change_streams", true, false, false), /** * Change capture based on MongoDB change Streams support. * The update message will contain the full document. */ - CHANGE_STREAMS_UPDATE_FULL("change_streams_update_full", true, true); + CHANGE_STREAMS_UPDATE_FULL("change_streams_update_full", true, true, false), + + /** + * Change capture based on MongoDB Change Streams support with pre-image. + * When applicable, the change event will include the full document before change. + */ + CHANGE_STREAMS_WITH_PRE_IMAGE("change_streams_with_pre_image", true, false, true), + + /** + * Change capture based on MongoDB change Streams support with pre-image. + * When applicable, the change event will include the full document before change. + * The update message will contain the full document. + */ + CHANGE_STREAMS_UPDATE_FULL_WITH_PRE_IMAGE("change_streams_update_full_with_pre_image", true, true, true); private final String value; private final boolean changeStreams; private final boolean fullUpdate; + private final boolean includePreImage; - private CaptureMode(String value, boolean changeStreams, boolean fullUpdate) { + private CaptureMode(String value, boolean changeStreams, boolean fullUpdate, boolean includePreImage) { this.value = value; this.changeStreams = changeStreams; this.fullUpdate = fullUpdate; + this.includePreImage = includePreImage; } @Override @@ -191,6 +206,10 @@ public static CaptureMode parse(String value, String defaultValue) { public boolean isFullUpdate() { return fullUpdate; } + + public boolean isIncludePreImage() { + return includePreImage; + } } protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java index 99d358975..021c022f7 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java @@ -72,6 +72,7 @@ public DataCollectionSchema schemaFor(CollectionId collectionId) { final Schema valueSchema = SchemaBuilder.struct() .name(adjuster.adjust(Envelope.schemaName(topicName))) + .field(FieldName.BEFORE, Json.builder().optional().build()) .field(FieldName.AFTER, Json.builder().optional().build()) // Oplog fields .field(MongoDbFieldName.PATCH, Json.builder().optional().build()) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index c324478a1..571f36512 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -35,6 +35,7 @@ import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary; import io.debezium.data.Envelope.Operation; @@ -214,6 +215,9 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R if (taskContext.getCaptureMode().isFullUpdate()) { rsChangeStream.fullDocument(FullDocument.UPDATE_LOOKUP); } + if (taskContext.getCaptureMode().isIncludePreImage()) { + rsChangeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE); + } if (rsOffsetContext.lastResumeToken() != null) { LOGGER.info("Resuming streaming from token '{}'", rsOffsetContext.lastResumeToken()); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java index dfd806825..630c0f999 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/converters/MongoDbRecordParser.java @@ -33,7 +33,7 @@ public class MongoDbRecordParser extends RecordParser { COLLECTION); public MongoDbRecordParser(Schema schema, Struct record) { - super(schema, record, Envelope.FieldName.AFTER, MongoDbFieldName.PATCH, MongoDbFieldName.FILTER, + super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER, MongoDbFieldName.PATCH, MongoDbFieldName.FILTER, MongoDbFieldName.UPDATE_DESCRIPTION); } diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index fa9485a3d..1c8b78de7 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -130,6 +130,7 @@ miphik,Andrey Savchuk narzdavid,Narz David LarsWerkman,Lars Werkman chadthamn,Chad Marmon +xinbinhuang,Xinbin Huang xqshe,Xinquan She wangminchao,Wang Min Chao markallanson,Mark Allanson diff --git a/pom.xml b/pom.xml index ef6b43627..214527545 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,7 @@ 42.4.1 8.0.29 0.27.2 - 4.6.1 + 4.7.1 10.2.1.jre8 21.6.0.0 11.5.0.0