DBZ-5628 Added support for Mongo pre-image in change stream

This commit is contained in:
Xinbin Huang 2022-09-19 17:28:02 -07:00 committed by Jiri Pechanec
parent 991dd7ce81
commit b4a87d45a7
8 changed files with 52 additions and 9 deletions

View File

@ -419,6 +419,7 @@ Willie Zhu
Wout Scheepers
Xiao Fu
Xiaopu Zhu
Xinbin Huang
Xinquan She
Xuan Shen
Yang Wu

View File

@ -107,14 +107,17 @@ public Struct valueFromDocumentChangeStream(ChangeStreamDocument<BsonDocument> d
Struct value = new Struct(valueSchema);
switch (operation) {
case CREATE:
final String jsonStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocument()));
value.put(FieldName.AFTER, jsonStr);
extractFullDocument(document, value);
break;
case UPDATE:
// Not null when full documents before change are enabled
if (document.getFullDocumentBeforeChange() != null) {
extractFullDocumentBeforeChange(document, value);
}
// Not null when full documents are enabled for updates
if (document.getFullDocument() != null) {
final String fullDocStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocument()));
value.put(FieldName.AFTER, fullDocStr);
extractFullDocument(document, value);
}
if (document.getUpdateDescription() != null) {
@ -156,11 +159,25 @@ public Struct valueFromDocumentChangeStream(ChangeStreamDocument<BsonDocument> d
}
break;
case DELETE:
// Not null when full documents before change are enabled
if (document.getFullDocumentBeforeChange() != null) {
extractFullDocumentBeforeChange(document, value);
}
break;
}
return value;
}
private void extractFullDocument(ChangeStreamDocument<BsonDocument> document, Struct value) {
final String fullDocStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocument()));
value.put(FieldName.AFTER, fullDocStr);
}
private void extractFullDocumentBeforeChange(ChangeStreamDocument<BsonDocument> document, Struct value) {
final String fullDocBeforeChangeStr = valueGenerator.apply(fieldFilter.apply(document.getFullDocumentBeforeChange()));
value.put(FieldName.BEFORE, fullDocBeforeChangeStr);
}
@Override
public int hashCode() {
return valueSchema().hashCode();

View File

@ -127,22 +127,37 @@ public static enum CaptureMode implements EnumeratedValue {
/**
* Change capture based on MongoDB Change Streams support.
*/
CHANGE_STREAMS("change_streams", true, false),
CHANGE_STREAMS("change_streams", true, false, false),
/**
* Change capture based on MongoDB change Streams support.
* The update message will contain the full document.
*/
CHANGE_STREAMS_UPDATE_FULL("change_streams_update_full", true, true);
CHANGE_STREAMS_UPDATE_FULL("change_streams_update_full", true, true, false),
/**
* Change capture based on MongoDB Change Streams support with pre-image.
* When applicable, the change event will include the full document before change.
*/
CHANGE_STREAMS_WITH_PRE_IMAGE("change_streams_with_pre_image", true, false, true),
/**
* Change capture based on MongoDB change Streams support with pre-image.
* When applicable, the change event will include the full document before change.
* The update message will contain the full document.
*/
CHANGE_STREAMS_UPDATE_FULL_WITH_PRE_IMAGE("change_streams_update_full_with_pre_image", true, true, true);
private final String value;
private final boolean changeStreams;
private final boolean fullUpdate;
private final boolean includePreImage;
private CaptureMode(String value, boolean changeStreams, boolean fullUpdate) {
private CaptureMode(String value, boolean changeStreams, boolean fullUpdate, boolean includePreImage) {
this.value = value;
this.changeStreams = changeStreams;
this.fullUpdate = fullUpdate;
this.includePreImage = includePreImage;
}
@Override
@ -191,6 +206,10 @@ public static CaptureMode parse(String value, String defaultValue) {
public boolean isFullUpdate() {
return fullUpdate;
}
public boolean isIncludePreImage() {
return includePreImage;
}
}
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0;

View File

@ -72,6 +72,7 @@ public DataCollectionSchema schemaFor(CollectionId collectionId) {
final Schema valueSchema = SchemaBuilder.struct()
.name(adjuster.adjust(Envelope.schemaName(topicName)))
.field(FieldName.BEFORE, Json.builder().optional().build())
.field(FieldName.AFTER, Json.builder().optional().build())
// Oplog fields
.field(MongoDbFieldName.PATCH, Json.builder().optional().build())

View File

@ -35,6 +35,7 @@
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.data.Envelope.Operation;
@ -214,6 +215,9 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R
if (taskContext.getCaptureMode().isFullUpdate()) {
rsChangeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
}
if (taskContext.getCaptureMode().isIncludePreImage()) {
rsChangeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
}
if (rsOffsetContext.lastResumeToken() != null) {
LOGGER.info("Resuming streaming from token '{}'", rsOffsetContext.lastResumeToken());

View File

@ -33,7 +33,7 @@ public class MongoDbRecordParser extends RecordParser {
COLLECTION);
public MongoDbRecordParser(Schema schema, Struct record) {
super(schema, record, Envelope.FieldName.AFTER, MongoDbFieldName.PATCH, MongoDbFieldName.FILTER,
super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER, MongoDbFieldName.PATCH, MongoDbFieldName.FILTER,
MongoDbFieldName.UPDATE_DESCRIPTION);
}

View File

@ -130,6 +130,7 @@ miphik,Andrey Savchuk
narzdavid,Narz David
LarsWerkman,Lars Werkman
chadthamn,Chad Marmon
xinbinhuang,Xinbin Huang
xqshe,Xinquan She
wangminchao,Wang Min Chao
markallanson,Mark Allanson

View File

@ -121,7 +121,7 @@
<version.postgresql.driver>42.4.1</version.postgresql.driver>
<version.mysql.driver>8.0.29</version.mysql.driver>
<version.mysql.binlog>0.27.2</version.mysql.binlog>
<version.mongo.driver>4.6.1</version.mongo.driver>
<version.mongo.driver>4.7.1</version.mongo.driver>
<version.sqlserver.driver>10.2.1.jre8</version.sqlserver.driver>
<version.oracle.driver>21.6.0.0</version.oracle.driver>
<version.db2.driver>11.5.0.0</version.db2.driver>