From 3efe8c0e51942dd5c47471e7bb7557c587e59637 Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Wed, 6 Sep 2023 14:55:57 +0200 Subject: [PATCH] DBZ-6872 Ability to switch the order of user defined and internal aggregation pipeline used by MongoDB connector when streaming changes --- .../mongodb/ChangeStreamPipelineFactory.java | 20 ++- .../mongodb/MongoDbConnectorConfig.java | 86 +++++++++++- .../ChangeStreamPipelineFactoryTest.java | 127 +++++++++++------- 3 files changed, 180 insertions(+), 53 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java index ef66caacf..1d8e81568 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java @@ -22,6 +22,7 @@ import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.OperationType; +import io.debezium.DebeziumException; import io.debezium.connector.mongodb.Filters.FilterConfig; import io.debezium.data.Envelope; @@ -42,14 +43,27 @@ class ChangeStreamPipelineFactory { ChangeStreamPipeline create() { // Resolve and combine internal and user pipelines serially - var internalPipeline = createInternalPipeline(); - var userPipeline = createUserPipeline(); - var effectivePipeline = internalPipeline.then(userPipeline); + var effectivePipeline = mergeUserAndInternalPipeline(); LOGGER.info("Effective change stream pipeline: {}", effectivePipeline); return effectivePipeline; } + private ChangeStreamPipeline mergeUserAndInternalPipeline() { + var internalPipeline = createInternalPipeline(); + var userPipeline = createUserPipeline(); + + switch (connectorConfig.getCursorPipelineOrder()) { + case INTERNAL_FIRST: + return internalPipeline.then(userPipeline); + case USER_FIRST: + return userPipeline.then(internalPipeline); + default: + // this should never happen + throw new DebeziumException("Unknown aggregation pipeline order"); + } + } + private ChangeStreamPipeline createInternalPipeline() { // Resolve the leaf filters var filters = Stream diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 2041c20b5..8ce5bbf35 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -361,6 +361,70 @@ public static ConnectionMode parse(String value, String defaultValue) { } } + /** + * The set of predefined CursorPipelineOrder options or aliases. + */ + public enum CursorPipelineOrder implements EnumeratedValue { + /** + * Connect individually to each replica set + */ + INTERNAL_FIRST("internal_first"), + + /** + * Connect to sharded cluster with single connection via mongos + */ + USER_FIRST("user_first"); + + private String value; + + CursorPipelineOrder(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static CursorPipelineOrder parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + + for (CursorPipelineOrder option : CursorPipelineOrder.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static CursorPipelineOrder parse(String value, String defaultValue) { + CursorPipelineOrder mode = parse(value); + + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + + return mode; + } + } + protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0; /** @@ -652,8 +716,9 @@ public static ConnectionMode parse(String value, String defaultValue) { .withDescription("The maximum processing time in milliseconds to wait for the oplog cursor to process a single poll request"); public static final Field CURSOR_PIPELINE = Field.create("cursor.pipeline") - .withDisplayName("Pipeline expression apply to the change stream cursor") + .withDisplayName("Pipeline stages applied to the change stream cursor") .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 4)) .withWidth(Width.SHORT) .withImportance(Importance.LOW) .withValidation(MongoDbConnectorConfig::validateChangeStreamPipeline) @@ -662,6 +727,17 @@ public static ConnectionMode parse(String value, String defaultValue) { "This can be used customize the data that the connector consumes. " + "Note that this comes after the internal pipelines used to support the connector (e.g. filtering database and collection names)."); + public static final Field CURSOR_PIPELINE_ORDER = Field.create("cursor.pipeline.order") + .withDisplayName("Change stream cursor pipeline order") + .withEnum(CursorPipelineOrder.class, CursorPipelineOrder.INTERNAL_FIRST) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 5)) + .withWidth(Width.SHORT) + .withImportance(Importance.MEDIUM) + .withDescription("The order used to construct the effective MongoDB aggregation stream pipeline " + + "Options include: " + + "'internal_first' (the default) Internal stages defined by the connector are applied first; " + + "'user_first' Stages defined by the 'cursor.pipeline' property are applied first; "); + public static final Field TOPIC_NAMING_STRATEGY = Field.create("topic.naming.strategy") .withDisplayName("Topic naming strategy class") .withType(Type.CLASS) @@ -756,6 +832,7 @@ public static ConfigDef configDef() { private final int snapshotMaxThreads; private final int cursorMaxAwaitTimeMs; private final ReplicaSets replicaSets; + private final CursorPipelineOrder cursorPipelineOrder; public MongoDbConnectorConfig(Configuration config) { super(config, DEFAULT_SNAPSHOT_FETCH_SIZE); @@ -774,6 +851,9 @@ public MongoDbConnectorConfig(Configuration config) { this.captureScope = CaptureScope.parse(captureScopeValue, MongoDbConnectorConfig.CAPTURE_SCOPE.defaultValueAsString()); this.captureTarget = config.getString(MongoDbConnectorConfig.CAPTURE_TARGET); + String cursorPipelineOrderValue = config.getString(MongoDbConnectorConfig.CURSOR_PIPELINE_ORDER); + this.cursorPipelineOrder = CursorPipelineOrder.parse(cursorPipelineOrderValue, MongoDbConnectorConfig.CURSOR_PIPELINE_ORDER.defaultValueAsString()); + this.snapshotMaxThreads = resolveSnapshotMaxThreads(config); this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0); @@ -944,6 +1024,10 @@ public int getCursorMaxAwaitTime() { return cursorMaxAwaitTimeMs; } + public CursorPipelineOrder getCursorPipelineOrder() { + return cursorPipelineOrder; + } + @Override public int getSnapshotMaxThreads() { return snapshotMaxThreads; diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java index 3291f35de..71127c6fd 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java @@ -9,8 +9,11 @@ import static org.mockito.BDDMockito.given; import java.io.IOException; +import java.util.Collection; import java.util.EnumSet; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.bson.conversions.Bson; import org.junit.Test; @@ -22,11 +25,59 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.debezium.connector.mongodb.Filters.FilterConfig; +import io.debezium.connector.mongodb.MongoDbConnectorConfig.CursorPipelineOrder; import io.debezium.data.Envelope; @RunWith(MockitoJUnitRunner.class) public class ChangeStreamPipelineFactoryTest { + private static final List INTERNAL_PIPELINE = List.of( + "" + + "{\n" + + " \"$replaceRoot\" : {\n" + + " \"newRoot\" : {\n" + + " \"event\" : \"$$ROOT\",\n" + + " \"namespace\" : {\n" + + " \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" + + " }\n" + + " }\n" + + " }\n" + + "}", + "" + + "{\n" + + " \"$match\" : {\n" + + " \"$and\" : [ {\n" + + " \"namespace\" : {\n" + + " \"$regularExpression\" : {\n" + + " \"pattern\" : \"dbit.*\",\n" + + " \"options\" : \"i\"\n" + + " }\n" + + " }\n" + + " }, {\n" + + " \"event.operationType\" : {\n" + + " \"$in\" : [ \"insert\", \"update\", \"replace\", \"delete\" ]\n" + + " }\n" + + " } ]\n" + + " }\n" + + "}", + "" + + "{\n" + + " \"$replaceRoot\" : {\n" + + " \"newRoot\" : \"$event\"\n" + + " }\n" + + "}"); + + private static final List USER_PIPELINE = List.of( + "{\n" + + " \"$match\" : {\n" + + " \"$and\" : [ {\n" + + " \"operationType\" : \"insert\"\n" + + " }, {\n" + + " \"fullDocument.eventId\" : 1404\n" + + " } ]\n" + + " }\n" + + "}"); + @InjectMocks private ChangeStreamPipelineFactory sut; @@ -36,72 +87,50 @@ public class ChangeStreamPipelineFactoryTest { private FilterConfig filterConfig; @Test - public void testCreate() { + public void testCreateWithInternalFirst() { + testCreate(CursorPipelineOrder.INTERNAL_FIRST, mergeStages(INTERNAL_PIPELINE, USER_PIPELINE)); + } + + @Test + public void testCreateWithUserFirst() { + testCreate(CursorPipelineOrder.USER_FIRST, mergeStages(USER_PIPELINE, INTERNAL_PIPELINE)); + } + + @SafeVarargs + private List mergeStages(List... stages) { + return Stream.of(stages) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + private String asJsonArray(List stages) { + return stages.stream().collect(Collectors.joining(",", "[", "]")); + } + + public void testCreate(CursorPipelineOrder pipelineOrder, List expectedStageJsons) { // Given: + given(connectorConfig.getCursorPipelineOrder()) + .willReturn(pipelineOrder); given(connectorConfig.getSkippedOperations()) .willReturn(EnumSet.of(Envelope.Operation.TRUNCATE)); // The default given(filterConfig.getCollectionIncludeList()) .willReturn("dbit.*"); given(filterConfig.getUserPipeline()) - .willReturn(new ChangeStreamPipeline("[{\"$match\": { \"$and\": [{\"operationType\": \"insert\"}, {\"fullDocument.eventId\": 1404 }] } }]")); + .willReturn(new ChangeStreamPipeline(asJsonArray(USER_PIPELINE))); // When: var pipeline = sut.create(); // Then: - assertPipelineStagesEquals(pipeline.getStages(), - "" + - "{\n" + - " \"$replaceRoot\" : {\n" + - " \"newRoot\" : {\n" + - " \"event\" : \"$$ROOT\",\n" + - " \"namespace\" : {\n" + - " \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" + - " }\n" + - " }\n" + - " }\n" + - "}", - "" + - "{\n" + - " \"$match\" : {\n" + - " \"$and\" : [ {\n" + - " \"namespace\" : {\n" + - " \"$regularExpression\" : {\n" + - " \"pattern\" : \"dbit.*\",\n" + - " \"options\" : \"i\"\n" + - " }\n" + - " }\n" + - " }, {\n" + - " \"event.operationType\" : {\n" + - " \"$in\" : [ \"insert\", \"update\", \"replace\", \"delete\" ]\n" + - " }\n" + - " } ]\n" + - " }\n" + - "}", - "" + - "{\n" + - " \"$replaceRoot\" : {\n" + - " \"newRoot\" : \"$event\"\n" + - " }\n" + - "}", - "" + - "{\n" + - " \"$match\" : {\n" + - " \"$and\" : [ {\n" + - " \"operationType\" : \"insert\"\n" + - " }, {\n" + - " \"fullDocument.eventId\" : 1404\n" + - " } ]\n" + - " }\n" + - "}"); + assertPipelineStagesEquals(pipeline.getStages(), expectedStageJsons); } - private static void assertPipelineStagesEquals(List stages, String... expectedStageJsons) { + private static void assertPipelineStagesEquals(List stages, List expectedStageJsons) { assertThat(stages) .hasSameSizeAs(expectedStageJsons); for (int i = 0; i < stages.size(); i++) { - var expectedStageJson = expectedStageJsons[i]; + var expectedStageJson = expectedStageJsons.get(i); assertThat(stages) .element(i) .satisfies((stage) -> assertJsonEquals(stage.toBsonDocument().toJson(), expectedStageJson));