DBZ-6421 Streaming pipeline is now compatible with DocumentDB

This commit is contained in:
jcechace 2023-05-09 17:31:06 +02:00 committed by Jiri Pechanec
parent dad7d0ce14
commit 5dfe52dbc4
2 changed files with 21 additions and 23 deletions

View File

@ -9,6 +9,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -72,17 +73,16 @@ private ChangeStreamPipeline createInternalPipeline() {
return new ChangeStreamPipeline( return new ChangeStreamPipeline(
// Materialize a "namespace" field so that we can do qualified collection name matching per // Materialize a "namespace" field so that we can do qualified collection name matching per
// the configuration requirements // the configuration requirements
// We can't use $addFields nor $set as there is no way to unset the filed for AWS DocumentDB
// Note that per the docs, if `$ns` doesn't exist, `$concat` will return `null` // Note that per the docs, if `$ns` doesn't exist, `$concat` will return `null`
addFields("namespace", concat("$ns.db", ".", "$ns.coll")), Aggregates.replaceRoot(new BasicDBObject(Map.of(
"namespace", concat("$ns.db", ".", "$ns.coll"),
"event", "$$ROOT"))),
// Filter the documents // Filter the documents
matchFilter, matchFilter,
// This is required to prevent driver `ChangeStreamDocument` deserialization issues: // This is required to prevent driver `ChangeStreamDocument` deserialization issues:
// > Caused by: org.bson.codecs.configuration.CodecConfigurationException: Aggregates.replaceRoot("$event"));
// > Failed to decode 'ChangeStreamDocument'. Decoding 'namespace' errored with:
// > readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING.
addFields("namespace", "$$REMOVE"));
} }
private ChangeStreamPipeline createUserPipeline() { private ChangeStreamPipeline createUserPipeline() {
@ -96,10 +96,10 @@ private static Optional<Bson> createCollectionFilter(FilterConfig filterConfig)
// https://www.mongodb.com/docs/manual/changeStreams/#watch-a-collection--database--or-deployment // https://www.mongodb.com/docs/manual/changeStreams/#watch-a-collection--database--or-deployment
var dbFilters = Optional.<Bson> empty(); var dbFilters = Optional.<Bson> empty();
if (filterConfig.getDbIncludeList() != null) { if (filterConfig.getDbIncludeList() != null) {
dbFilters = Optional.of(Filters.regex("ns.db", filterConfig.getDbIncludeList().replaceAll(",", "|"), "i")); dbFilters = Optional.of(Filters.regex("event.ns.db", filterConfig.getDbIncludeList().replaceAll(",", "|"), "i"));
} }
else if (filterConfig.getDbExcludeList() != null) { else if (filterConfig.getDbExcludeList() != null) {
dbFilters = Optional.of(Filters.regex("ns.db", "(?!" + filterConfig.getDbExcludeList().replaceAll(",", "|") + ")", "i")); dbFilters = Optional.of(Filters.regex("event.ns.db", "(?!" + filterConfig.getDbExcludeList().replaceAll(",", "|") + ")", "i"));
} }
// Collection filters // Collection filters
@ -158,7 +158,7 @@ private static Optional<Bson> createOperationTypeFilter(MongoDbConnectorConfig c
includedOperations.remove(OperationType.DELETE); includedOperations.remove(OperationType.DELETE);
} }
return Optional.of(Filters.in("operationType", includedOperations.stream() return Optional.of(Filters.in("event.operationType", includedOperations.stream()
.map(OperationType::getValue) .map(OperationType::getValue)
.collect(toList()))); .collect(toList())));
} }
@ -201,9 +201,4 @@ private static List<Bson> resolveFilters(Optional<Bson>... filters) {
private static Bson concat(Object... expressions) { private static Bson concat(Object... expressions) {
return new BasicDBObject("$concat", List.of(expressions)); return new BasicDBObject("$concat", List.of(expressions));
} }
private static Bson addFields(String name, Object expression) {
return new BasicDBObject("$addFields", new BasicDBObject(name, expression));
}
} }

View File

@ -52,11 +52,14 @@ public void testCreate() {
assertPipelineStagesEquals(pipeline.getStages(), assertPipelineStagesEquals(pipeline.getStages(),
"" + "" +
"{\n" + "{\n" +
" \"$addFields\" : {\n" + " \"$replaceRoot\" : {\n" +
" \"newRoot\" : {\n" +
" \"event\" : \"$$ROOT\",\n" +
" \"namespace\" : {\n" + " \"namespace\" : {\n" +
" \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" + " \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" +
" }\n" + " }\n" +
" }\n" + " }\n" +
" }\n" +
"}", "}",
"" + "" +
"{\n" + "{\n" +
@ -69,7 +72,7 @@ public void testCreate() {
" }\n" + " }\n" +
" }\n" + " }\n" +
" }, {\n" + " }, {\n" +
" \"operationType\" : {\n" + " \"event.operationType\" : {\n" +
" \"$in\" : [ \"insert\", \"update\", \"replace\", \"delete\" ]\n" + " \"$in\" : [ \"insert\", \"update\", \"replace\", \"delete\" ]\n" +
" }\n" + " }\n" +
" } ]\n" + " } ]\n" +
@ -77,8 +80,8 @@ public void testCreate() {
"}", "}",
"" + "" +
"{\n" + "{\n" +
" \"$addFields\" : {\n" + " \"$replaceRoot\" : {\n" +
" \"namespace\" : \"$$REMOVE\"\n" + " \"newRoot\" : \"$event\"\n" +
" }\n" + " }\n" +
"}", "}",
"" + "" +
@ -108,9 +111,9 @@ private static void assertPipelineStagesEquals(List<? extends Bson> stages, Stri
private static void assertJsonEquals(String actual, String expected) { private static void assertJsonEquals(String actual, String expected) {
try { try {
var mapper = new ObjectMapper(); var mapper = new ObjectMapper();
actual = mapper.readTree(actual).toPrettyString(); var actualNode = mapper.readTree(actual);
expected = mapper.readTree(expected).toPrettyString(); var expectedNode = mapper.readTree(expected);
assertThat(actual).isEqualTo(expected); assertThat(actualNode).isEqualTo(expectedNode);
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);