DBZ-6872 Ability to switch the order of user defined and internal aggregation pipeline used by MongoDB connector when streaming changes

This commit is contained in:
Jakub Cechacek 2023-09-06 14:55:57 +02:00 committed by Jiri Pechanec
parent feaedd72a9
commit 3efe8c0e51
3 changed files with 180 additions and 53 deletions

View File

@ -22,6 +22,7 @@
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.OperationType;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.Filters.FilterConfig;
import io.debezium.data.Envelope;
@ -42,14 +43,27 @@ class ChangeStreamPipelineFactory {
ChangeStreamPipeline create() {
// Resolve and combine internal and user pipelines serially
var internalPipeline = createInternalPipeline();
var userPipeline = createUserPipeline();
var effectivePipeline = internalPipeline.then(userPipeline);
var effectivePipeline = mergeUserAndInternalPipeline();
LOGGER.info("Effective change stream pipeline: {}", effectivePipeline);
return effectivePipeline;
}
private ChangeStreamPipeline mergeUserAndInternalPipeline() {
var internalPipeline = createInternalPipeline();
var userPipeline = createUserPipeline();
switch (connectorConfig.getCursorPipelineOrder()) {
case INTERNAL_FIRST:
return internalPipeline.then(userPipeline);
case USER_FIRST:
return userPipeline.then(internalPipeline);
default:
// this should never happen
throw new DebeziumException("Unknown aggregation pipeline order");
}
}
private ChangeStreamPipeline createInternalPipeline() {
// Resolve the leaf filters
var filters = Stream

View File

@ -361,6 +361,70 @@ public static ConnectionMode parse(String value, String defaultValue) {
}
}
/**
* The set of predefined CursorPipelineOrder options or aliases.
*/
public enum CursorPipelineOrder implements EnumeratedValue {
/**
* Connect individually to each replica set
*/
INTERNAL_FIRST("internal_first"),
/**
* Connect to sharded cluster with single connection via mongos
*/
USER_FIRST("user_first");
private String value;
CursorPipelineOrder(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static CursorPipelineOrder parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (CursorPipelineOrder option : CursorPipelineOrder.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static CursorPipelineOrder parse(String value, String defaultValue) {
CursorPipelineOrder mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 0;
/**
@ -652,8 +716,9 @@ public static ConnectionMode parse(String value, String defaultValue) {
.withDescription("The maximum processing time in milliseconds to wait for the oplog cursor to process a single poll request");
public static final Field CURSOR_PIPELINE = Field.create("cursor.pipeline")
.withDisplayName("Pipeline expression apply to the change stream cursor")
.withDisplayName("Pipeline stages applied to the change stream cursor")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 4))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withValidation(MongoDbConnectorConfig::validateChangeStreamPipeline)
@ -662,6 +727,17 @@ public static ConnectionMode parse(String value, String defaultValue) {
"This can be used customize the data that the connector consumes. " +
"Note that this comes after the internal pipelines used to support the connector (e.g. filtering database and collection names).");
public static final Field CURSOR_PIPELINE_ORDER = Field.create("cursor.pipeline.order")
.withDisplayName("Change stream cursor pipeline order")
.withEnum(CursorPipelineOrder.class, CursorPipelineOrder.INTERNAL_FIRST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 5))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("The order used to construct the effective MongoDB aggregation stream pipeline "
+ "Options include: "
+ "'internal_first' (the default) Internal stages defined by the connector are applied first; "
+ "'user_first' Stages defined by the 'cursor.pipeline' property are applied first; ");
public static final Field TOPIC_NAMING_STRATEGY = Field.create("topic.naming.strategy")
.withDisplayName("Topic naming strategy class")
.withType(Type.CLASS)
@ -756,6 +832,7 @@ public static ConfigDef configDef() {
private final int snapshotMaxThreads;
private final int cursorMaxAwaitTimeMs;
private final ReplicaSets replicaSets;
private final CursorPipelineOrder cursorPipelineOrder;
public MongoDbConnectorConfig(Configuration config) {
super(config, DEFAULT_SNAPSHOT_FETCH_SIZE);
@ -774,6 +851,9 @@ public MongoDbConnectorConfig(Configuration config) {
this.captureScope = CaptureScope.parse(captureScopeValue, MongoDbConnectorConfig.CAPTURE_SCOPE.defaultValueAsString());
this.captureTarget = config.getString(MongoDbConnectorConfig.CAPTURE_TARGET);
String cursorPipelineOrderValue = config.getString(MongoDbConnectorConfig.CURSOR_PIPELINE_ORDER);
this.cursorPipelineOrder = CursorPipelineOrder.parse(cursorPipelineOrderValue, MongoDbConnectorConfig.CURSOR_PIPELINE_ORDER.defaultValueAsString());
this.snapshotMaxThreads = resolveSnapshotMaxThreads(config);
this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0);
@ -944,6 +1024,10 @@ public int getCursorMaxAwaitTime() {
return cursorMaxAwaitTimeMs;
}
public CursorPipelineOrder getCursorPipelineOrder() {
return cursorPipelineOrder;
}
@Override
public int getSnapshotMaxThreads() {
return snapshotMaxThreads;

View File

@ -9,8 +9,11 @@
import static org.mockito.BDDMockito.given;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.bson.conversions.Bson;
import org.junit.Test;
@ -22,11 +25,59 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.connector.mongodb.Filters.FilterConfig;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CursorPipelineOrder;
import io.debezium.data.Envelope;
@RunWith(MockitoJUnitRunner.class)
public class ChangeStreamPipelineFactoryTest {
private static final List<String> INTERNAL_PIPELINE = List.of(
"" +
"{\n" +
" \"$replaceRoot\" : {\n" +
" \"newRoot\" : {\n" +
" \"event\" : \"$$ROOT\",\n" +
" \"namespace\" : {\n" +
" \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" +
" }\n" +
" }\n" +
" }\n" +
"}",
"" +
"{\n" +
" \"$match\" : {\n" +
" \"$and\" : [ {\n" +
" \"namespace\" : {\n" +
" \"$regularExpression\" : {\n" +
" \"pattern\" : \"dbit.*\",\n" +
" \"options\" : \"i\"\n" +
" }\n" +
" }\n" +
" }, {\n" +
" \"event.operationType\" : {\n" +
" \"$in\" : [ \"insert\", \"update\", \"replace\", \"delete\" ]\n" +
" }\n" +
" } ]\n" +
" }\n" +
"}",
"" +
"{\n" +
" \"$replaceRoot\" : {\n" +
" \"newRoot\" : \"$event\"\n" +
" }\n" +
"}");
private static final List<String> USER_PIPELINE = List.of(
"{\n" +
" \"$match\" : {\n" +
" \"$and\" : [ {\n" +
" \"operationType\" : \"insert\"\n" +
" }, {\n" +
" \"fullDocument.eventId\" : 1404\n" +
" } ]\n" +
" }\n" +
"}");
@InjectMocks
private ChangeStreamPipelineFactory sut;
@ -36,72 +87,50 @@ public class ChangeStreamPipelineFactoryTest {
private FilterConfig filterConfig;
@Test
public void testCreate() {
public void testCreateWithInternalFirst() {
testCreate(CursorPipelineOrder.INTERNAL_FIRST, mergeStages(INTERNAL_PIPELINE, USER_PIPELINE));
}
@Test
public void testCreateWithUserFirst() {
testCreate(CursorPipelineOrder.USER_FIRST, mergeStages(USER_PIPELINE, INTERNAL_PIPELINE));
}
@SafeVarargs
private List<String> mergeStages(List<String>... stages) {
return Stream.of(stages)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
private String asJsonArray(List<String> stages) {
return stages.stream().collect(Collectors.joining(",", "[", "]"));
}
public void testCreate(CursorPipelineOrder pipelineOrder, List<String> expectedStageJsons) {
// Given:
given(connectorConfig.getCursorPipelineOrder())
.willReturn(pipelineOrder);
given(connectorConfig.getSkippedOperations())
.willReturn(EnumSet.of(Envelope.Operation.TRUNCATE)); // The default
given(filterConfig.getCollectionIncludeList())
.willReturn("dbit.*");
given(filterConfig.getUserPipeline())
.willReturn(new ChangeStreamPipeline("[{\"$match\": { \"$and\": [{\"operationType\": \"insert\"}, {\"fullDocument.eventId\": 1404 }] } }]"));
.willReturn(new ChangeStreamPipeline(asJsonArray(USER_PIPELINE)));
// When:
var pipeline = sut.create();
// Then:
assertPipelineStagesEquals(pipeline.getStages(),
"" +
"{\n" +
" \"$replaceRoot\" : {\n" +
" \"newRoot\" : {\n" +
" \"event\" : \"$$ROOT\",\n" +
" \"namespace\" : {\n" +
" \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" +
" }\n" +
" }\n" +
" }\n" +
"}",
"" +
"{\n" +
" \"$match\" : {\n" +
" \"$and\" : [ {\n" +
" \"namespace\" : {\n" +
" \"$regularExpression\" : {\n" +
" \"pattern\" : \"dbit.*\",\n" +
" \"options\" : \"i\"\n" +
" }\n" +
" }\n" +
" }, {\n" +
" \"event.operationType\" : {\n" +
" \"$in\" : [ \"insert\", \"update\", \"replace\", \"delete\" ]\n" +
" }\n" +
" } ]\n" +
" }\n" +
"}",
"" +
"{\n" +
" \"$replaceRoot\" : {\n" +
" \"newRoot\" : \"$event\"\n" +
" }\n" +
"}",
"" +
"{\n" +
" \"$match\" : {\n" +
" \"$and\" : [ {\n" +
" \"operationType\" : \"insert\"\n" +
" }, {\n" +
" \"fullDocument.eventId\" : 1404\n" +
" } ]\n" +
" }\n" +
"}");
assertPipelineStagesEquals(pipeline.getStages(), expectedStageJsons);
}
private static void assertPipelineStagesEquals(List<? extends Bson> stages, String... expectedStageJsons) {
private static void assertPipelineStagesEquals(List<? extends Bson> stages, List<String> expectedStageJsons) {
assertThat(stages)
.hasSameSizeAs(expectedStageJsons);
for (int i = 0; i < stages.size(); i++) {
var expectedStageJson = expectedStageJsons[i];
var expectedStageJson = expectedStageJsons.get(i);
assertThat(stages)
.element(i)
.satisfies((stage) -> assertJsonEquals(stage.toBsonDocument().toJson(), expectedStageJson));