DBZ-6131: Support change stream filtering using MongoDBs aggregation pipeline step.

This commit is contained in:
Bobby Tiernay 2023-02-23 15:01:21 -05:00 committed by Jakub Cechacek
parent a46bc2903b
commit fe3135f28d
9 changed files with 271 additions and 34 deletions

View File

@ -0,0 +1,77 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.BasicDBObject;
/**
* A change stream aggregation pipeline, used to modify the output of a MongoDB change stream.
*
* @see <a href="https://www.mongodb.com/docs/manual/changeStreams/#modify-change-stream-output.">Modify Change Stream Output</a>
*/
public class ChangeStreamPipeline {
private final List<? extends Bson> stages;
public ChangeStreamPipeline(String json) {
this.stages = parse(json);
}
public ChangeStreamPipeline(List<? extends Bson> stages) {
this.stages = stages;
}
public ChangeStreamPipeline(Bson... stages) {
this(List.of(stages));
}
public List<? extends Bson> getStages() {
return stages;
}
/**
* Creates a new pipeline that is a combination of the current and supplied pipeline stages in serial.
*
* @param pipeline the pipeline to add in serial.
* @return the combined pipeline
*/
public ChangeStreamPipeline then(ChangeStreamPipeline pipeline) {
var stages = new ArrayList<Bson>();
stages.addAll(this.getStages());
stages.addAll(pipeline.getStages());
return new ChangeStreamPipeline(stages);
}
public String toString() {
return format(stages);
}
private static String format(List<? extends Bson> stages) {
return new BasicDBObject("stages", stages)
.toBsonDocument()
.getArray("stages")
.getValues()
.toString();
}
private static List<? extends Bson> parse(String json) {
if (json == null || json.isEmpty()) {
return Collections.emptyList();
}
// Top-level for `parse` must be a document not a list, hence this trick
return Document.parse("{stages: " + json + "}")
.getList("stages", Document.class);
}
}

View File

@ -41,7 +41,17 @@ class ChangeStreamPipelineFactory {
this.filterConfig = filterConfig;
}
List<Bson> create() {
ChangeStreamPipeline create() {
// Resolve and combine internal and user pipelines serially
var internalPipeline = createInternalPipeline();
var userPipeline = createUserPipeline();
var effectivePipeline = internalPipeline.then(userPipeline);
LOGGER.info("Effective change stream pipeline: {}", effectivePipeline);
return effectivePipeline;
}
private ChangeStreamPipeline createInternalPipeline() {
// Resolve the leaf filters
var filters = Stream
.of(
@ -60,7 +70,7 @@ List<Bson> create() {
// - https://www.mongodb.com/docs/manual/administration/change-streams-production-recommendations/#indexes-and-performance
// Note that `$addFields` must be used over `$set`/ `$unset` to support MongoDB 4.0 which doesn't support these operators:
// - https://www.mongodb.com/docs/manual/changeStreams/#modify-change-stream-output
var pipeline = List.of(
return new ChangeStreamPipeline(
// Materialize a "namespace" field so that we can do qualified collection name matching per
// the configuration requirements
// Note that per the docs, if `$ns` doesn't exist, `$concat` will return `null`
@ -74,9 +84,11 @@ List<Bson> create() {
// > Failed to decode 'ChangeStreamDocument'. Decoding 'namespace' errored with:
// > readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING.
addFields("namespace", "$$REMOVE"));
}
LOGGER.info("Change stream pipeline: {}", new BasicDBObject("pipeline", pipeline).toBsonDocument().toJson());
return pipeline;
private ChangeStreamPipeline createUserPipeline() {
// Delegate to the configuration
return filterConfig.getUserPipeline();
}
private static Optional<Bson> createCollectionFilter(FilterConfig filterConfig) {

View File

@ -125,15 +125,17 @@ public static class FilterConfig {
private final String fieldRenames;
private final String fieldExcludeList;
private final String signalDataCollection;
private final ChangeStreamPipeline userPipeline;
public FilterConfig(Configuration config) {
this.dbIncludeList = resolve(config, MongoDbConnectorConfig.DATABASE_INCLUDE_LIST);
this.dbExcludeList = resolve(config, MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST);
this.collectionIncludeList = resolve(config, MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST);
this.collectionExcludeList = resolve(config, MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST);
this.fieldRenames = resolve(config, MongoDbConnectorConfig.FIELD_RENAMES);
this.fieldExcludeList = resolve(config, MongoDbConnectorConfig.FIELD_EXCLUDE_LIST);
this.signalDataCollection = resolve(config, MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION);
this.dbIncludeList = resolveString(config, MongoDbConnectorConfig.DATABASE_INCLUDE_LIST);
this.dbExcludeList = resolveString(config, MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST);
this.collectionIncludeList = resolveString(config, MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST);
this.collectionExcludeList = resolveString(config, MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST);
this.fieldRenames = resolveString(config, MongoDbConnectorConfig.FIELD_RENAMES);
this.fieldExcludeList = resolveString(config, MongoDbConnectorConfig.FIELD_EXCLUDE_LIST);
this.signalDataCollection = resolveString(config, MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION);
this.userPipeline = resolveChangeStreamPipeline(config, MongoDbConnectorConfig.CURSOR_PIPELINE);
}
public String getDbIncludeList() {
@ -168,21 +170,30 @@ public Set<String> getBuiltInDbNames() {
return BUILT_IN_DB_NAMES;
}
private static String resolve(Configuration config, Field key) {
public ChangeStreamPipeline getUserPipeline() {
return userPipeline;
}
private static String resolveString(Configuration config, Field key) {
return normalize(config.getString(key));
}
private static String normalize(String value) {
if (value == null) {
private static ChangeStreamPipeline resolveChangeStreamPipeline(Configuration config, Field field) {
var text = config.getString(field);
return new ChangeStreamPipeline(text);
}
private static String normalize(String text) {
if (text == null) {
return null;
}
value = value.trim();
if (value.isEmpty()) {
text = text.trim();
if (text.isEmpty()) {
return null;
}
return value;
return text;
}
}

View File

@ -509,6 +509,17 @@ public boolean isIncludePreImage() {
.withImportance(Importance.LOW)
.withDescription("The maximum processing time in milliseconds to wait for the oplog cursor to process a single poll request");
public static final Field CURSOR_PIPELINE = Field.create("cursor.pipeline")
.withDisplayName("Pipeline expression apply to the change stream cursor")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withValidation(MongoDbConnectorConfig::validateChangeStreamPipeline)
.withDescription("Applies processing to change events as part of the the standard MongoDB aggregation stream pipeline. " +
"A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data. " +
"This can be used customize the data that the connector consumes. " +
"Note that this comes after the internal pipelines used to support the connector (e.g. filtering database and collection names).");
public static final Field TOPIC_NAMING_STRATEGY = Field.create("topic.naming.strategy")
.withDisplayName("Topic naming strategy class")
.withType(Type.CLASS)
@ -614,6 +625,19 @@ private static int validateConnectionString(Configuration config, Field field, V
return 0;
}
private static int validateChangeStreamPipeline(Configuration config, Field field, ValidationOutput problems) {
String value = config.getString(field);
try {
new ChangeStreamPipeline(value);
}
catch (Exception e) {
problems.accept(field, value, "Change stream pipeline JSON is invalid: " + e.getMessage());
return 1;
}
return 0;
}
private static int validateAutodiscovery(Configuration config, Field field, ValidationOutput problems) {
boolean value = config.getBoolean(field);
if (!value && config.hasKey(CONNECTION_STRING)) {

View File

@ -18,7 +18,6 @@
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -179,8 +178,9 @@ private void readChangeStream(MongoClient client, MongoPreferredNode mongo, Repl
final ServerAddress nodeAddress = MongoUtil.getPreferredAddress(client, mongo.getPreference());
LOGGER.info("Reading change stream for '{}'/{} from {} starting at {}", replicaSet, mongo.getPreference().getName(), nodeAddress, oplogStart);
final List<Bson> pipeline = new ChangeStreamPipelineFactory(rsOffsetContext, taskContext.getConnectorConfig(), taskContext.filters().getConfig()).create();
final ChangeStreamIterable<BsonDocument> rsChangeStream = client.watch(pipeline, BsonDocument.class);
final ChangeStreamPipeline pipeline = new ChangeStreamPipelineFactory(rsOffsetContext, taskContext.getConnectorConfig(), taskContext.filters().getConfig())
.create();
final ChangeStreamIterable<BsonDocument> rsChangeStream = client.watch(pipeline.getStages(), BsonDocument.class);
if (taskContext.getCaptureMode().isFullUpdate()) {
rsChangeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
}

View File

@ -10,8 +10,10 @@
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import org.bson.BsonTimestamp;
import org.bson.conversions.Bson;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@ -43,6 +45,8 @@ public void testCreate() {
.willReturn(EnumSet.of(Envelope.Operation.TRUNCATE)); // The default
given(filterConfig.getCollectionIncludeList())
.willReturn("dbit.*");
given(filterConfig.getUserPipeline())
.willReturn(new ChangeStreamPipeline("[{\"$match\": { \"$and\": [{\"operationType\": \"insert\"}, {\"fullDocument.eventId\": 1404 }] } }]"));
given(rsOffsetContext.lastResumeToken())
.willReturn(null);
given(rsOffsetContext.lastOffsetTimestamp())
@ -52,21 +56,16 @@ public void testCreate() {
var pipeline = sut.create();
// Then:
assertThat(pipeline)
.hasSize(3);
assertThat(pipeline)
.element(0)
.satisfies((stage) -> assertJsonEquals(stage.toBsonDocument().toJson(), "" +
assertPipelineStagesEquals(pipeline.getStages(),
"" +
"{\n" +
" \"$addFields\" : {\n" +
" \"namespace\" : {\n" +
" \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" +
" }\n" +
" }\n" +
"}"));
assertThat(pipeline)
.element(1)
.satisfies((stage) -> assertJsonEquals(stage.toBsonDocument().toJson(), "" +
"}",
"" +
"{\n" +
" \"$match\" : {\n" +
" \"$and\" : [ {\n" +
@ -91,15 +90,35 @@ public void testCreate() {
" }\n" +
" } ]\n" +
" }\n" +
"}"));
assertThat(pipeline)
.element(2)
.satisfies((stage) -> assertJsonEquals(stage.toBsonDocument().toJson(), "" +
"}",
"" +
"{\n" +
" \"$addFields\" : {\n" +
" \"namespace\" : \"$$REMOVE\"\n" +
" }\n" +
"}"));
"}",
"" +
"{\n" +
" \"$match\" : {\n" +
" \"$and\" : [ {\n" +
" \"operationType\" : \"insert\"\n" +
" }, {\n" +
" \"fullDocument.eventId\" : 1404\n" +
" } ]\n" +
" }\n" +
"}");
}
private static void assertPipelineStagesEquals(List<? extends Bson> stages, String... expectedStageJsons) {
assertThat(stages)
.hasSameSizeAs(expectedStageJsons);
for (int i = 0; i < stages.size(); i++) {
var expectedStageJson = expectedStageJsons[i];
assertThat(stages)
.element(i)
.satisfies((stage) -> assertJsonEquals(stage.toBsonDocument().toJson(), expectedStageJson));
}
}
private static void assertJsonEquals(String actual, String expected) {

View File

@ -5,6 +5,12 @@
*/
package io.debezium.connector.mongodb;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
@ -12,6 +18,10 @@
import org.apache.kafka.connect.data.Struct;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
public class MongoDbConnectorConfigTest {
@ -33,4 +43,52 @@ public void parseSignallingMessage() {
Assert.assertEquals("execute-snapshot", result[1]);
Assert.assertEquals("{\"data-collections\": [\"database.collection\"], \"type\": \"incremental\"}", result[2]);
}
@Test
public void parseCursorPipeline() {
verifyCursorPipelineValidateError("This is not valid JSON pipeline",
"Change stream pipeline JSON is invalid: JSON reader was expecting a value but found 'This'.");
verifyCursorPipelineValidateError("{$match: {}}", "Change stream pipeline JSON is invalid: Cannot cast org.bson.Document to java.util.List");
verifyCursorPipelineValidateSuccess(null);
verifyCursorPipelineValidateSuccess("");
verifyCursorPipelineValidateSuccess("[]");
verifyCursorPipelineValidateSuccess("[{$match: {}}]");
verifyCursorPipelineValidateSuccess("[{\"$match\": { \"$and\": [{\"operationType\": \"insert\"}, {\"fullDocument.eventId\": 1404 }] } }]\n");
}
private static void verifyCursorPipelineValidateError(String value, String expectedError) {
verifyCursorPipelineValidate(value, expectedError, false);
}
private static void verifyCursorPipelineValidateSuccess(String value) {
verifyCursorPipelineValidate(value, null, true);
}
private static void verifyCursorPipelineValidate(String value, String expectedError, boolean success) {
// Given:
var config = mock(Configuration.class);
var output = mock(Field.ValidationOutput.class);
var errorMessage = ArgumentCaptor.forClass(String.class);
var field = MongoDbConnectorConfig.CURSOR_PIPELINE;
given(config.getString(field)).willReturn(value);
doNothing().when(output).accept(eq(field), eq(value), errorMessage.capture());
// When:
field.validate(config, output);
// Then:
if (success) {
assertThat(errorMessage.getAllValues())
.isEmpty();
}
else {
assertThat(errorMessage.getAllValues())
.hasSize(1)
.element(0)
.isEqualTo(expectedError);
}
}
}

View File

@ -1888,6 +1888,36 @@ public void shouldGenerateRecordsWithCorrectlySerializedId() throws Exception {
}
}
@Test
public void shouldSkipNonPipelineRecords() throws Exception {
config = TestHelper.getConfiguration(mongo).edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(MongoDbConnectorConfig.CURSOR_PIPELINE, "[{$match:{'fullDocument.name':'Dennis'}}]")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.build();
context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(mongo, "dbit");
start(MongoDbConnector.class, config);
waitForStreamingRunning("mongodb", "mongo");
var coll = "c1";
insertDocuments("dbit", coll,
new Document().append("_id", 1).append("name", "Albert"),
new Document().append("_id", 2).append("name", "Bobby"),
new Document().append("_id", 3).append("name", "Clyde"),
new Document().append("_id", 4).append("name", "Dennis"));
var records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("mongo.dbit" + "." + coll))
.hasSize(1)
.element(0)
.satisfies(record -> assertThat(Document.parse(((Struct) record.value()).getString(Envelope.FieldName.AFTER)))
.isEqualTo(Document.parse("{_id:4,name:'Dennis'}")));
}
private static void assertSourceRecordKeyFieldIsEqualTo(SourceRecord record, String fieldName, String expected) {
final Struct struct = (Struct) record.key();
assertThat(struct.get(fieldName)).isEqualTo(expected);

View File

@ -1669,6 +1669,12 @@ A value of `0` disables this behavior.
|30000 (30 seconds)
|The number of milliseconds the driver will wait to select a server before it times out and throws an error.
|[[mongodb-property-cursor-pipeline]]<<mongodb-property-cursor-pipeline, `+cursor.pipeline+`>>
|No default
|When streaming changes, this setting applies processing to change stream events as part of the standard MongoDB aggregation stream pipeline. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data. This can be used customize the data that the connector consumes.
The value of this property must be an array of permitted https://www.mongodb.com/docs/manual/changeStreams/#modify-change-stream-output[aggregation pipeline stages] in JSON format.
Note that this is appended after the internal pipeline used to support the connector (e.g. filtering operation types, database names, collection names, etc.).
|[[mongodb-property-cursor-max-await-time-ms]]<<mongodb-property-cursor-max-await-time-ms, `+cursor.max.await.time.ms+`>>
|`0`
|Specifies the maximum number of milliseconds the oplog/change stream cursor will wait for the server to produce a result before causing an execution timeout exception.