From 5dfe52dbc4fe1056a43155728fcf04cafbd5a3eb Mon Sep 17 00:00:00 2001 From: jcechace Date: Tue, 9 May 2023 17:31:06 +0200 Subject: [PATCH] DBZ-6421 Streaming pipeline is now compatible with DocumentDB --- .../mongodb/ChangeStreamPipelineFactory.java | 23 ++++++++----------- .../ChangeStreamPipelineFactoryTest.java | 21 +++++++++-------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java index 98d813b7e..7ded0a952 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Stream; @@ -72,17 +73,16 @@ private ChangeStreamPipeline createInternalPipeline() { return new ChangeStreamPipeline( // Materialize a "namespace" field so that we can do qualified collection name matching per // the configuration requirements + // We can't use $addFields nor $set as there is no way to unset the filed for AWS DocumentDB // Note that per the docs, if `$ns` doesn't exist, `$concat` will return `null` - addFields("namespace", concat("$ns.db", ".", "$ns.coll")), - + Aggregates.replaceRoot(new BasicDBObject(Map.of( + "namespace", concat("$ns.db", ".", "$ns.coll"), + "event", "$$ROOT"))), // Filter the documents matchFilter, // This is required to prevent driver `ChangeStreamDocument` deserialization issues: - // > Caused by: org.bson.codecs.configuration.CodecConfigurationException: - // > Failed to decode 'ChangeStreamDocument'. Decoding 'namespace' errored with: - // > readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is STRING. - addFields("namespace", "$$REMOVE")); + Aggregates.replaceRoot("$event")); } private ChangeStreamPipeline createUserPipeline() { @@ -96,10 +96,10 @@ private static Optional createCollectionFilter(FilterConfig filterConfig) // https://www.mongodb.com/docs/manual/changeStreams/#watch-a-collection--database--or-deployment var dbFilters = Optional. empty(); if (filterConfig.getDbIncludeList() != null) { - dbFilters = Optional.of(Filters.regex("ns.db", filterConfig.getDbIncludeList().replaceAll(",", "|"), "i")); + dbFilters = Optional.of(Filters.regex("event.ns.db", filterConfig.getDbIncludeList().replaceAll(",", "|"), "i")); } else if (filterConfig.getDbExcludeList() != null) { - dbFilters = Optional.of(Filters.regex("ns.db", "(?!" + filterConfig.getDbExcludeList().replaceAll(",", "|") + ")", "i")); + dbFilters = Optional.of(Filters.regex("event.ns.db", "(?!" + filterConfig.getDbExcludeList().replaceAll(",", "|") + ")", "i")); } // Collection filters @@ -158,7 +158,7 @@ private static Optional createOperationTypeFilter(MongoDbConnectorConfig c includedOperations.remove(OperationType.DELETE); } - return Optional.of(Filters.in("operationType", includedOperations.stream() + return Optional.of(Filters.in("event.operationType", includedOperations.stream() .map(OperationType::getValue) .collect(toList()))); } @@ -201,9 +201,4 @@ private static List resolveFilters(Optional... filters) { private static Bson concat(Object... expressions) { return new BasicDBObject("$concat", List.of(expressions)); } - - private static Bson addFields(String name, Object expression) { - return new BasicDBObject("$addFields", new BasicDBObject(name, expression)); - } - } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java index a982c4523..3291f35de 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactoryTest.java @@ -52,9 +52,12 @@ public void testCreate() { assertPipelineStagesEquals(pipeline.getStages(), "" + "{\n" + - " \"$addFields\" : {\n" + - " \"namespace\" : {\n" + - " \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" + + " \"$replaceRoot\" : {\n" + + " \"newRoot\" : {\n" + + " \"event\" : \"$$ROOT\",\n" + + " \"namespace\" : {\n" + + " \"$concat\" : [ \"$ns.db\", \".\", \"$ns.coll\" ]\n" + + " }\n" + " }\n" + " }\n" + "}", @@ -69,7 +72,7 @@ public void testCreate() { " }\n" + " }\n" + " }, {\n" + - " \"operationType\" : {\n" + + " \"event.operationType\" : {\n" + " \"$in\" : [ \"insert\", \"update\", \"replace\", \"delete\" ]\n" + " }\n" + " } ]\n" + @@ -77,8 +80,8 @@ public void testCreate() { "}", "" + "{\n" + - " \"$addFields\" : {\n" + - " \"namespace\" : \"$$REMOVE\"\n" + + " \"$replaceRoot\" : {\n" + + " \"newRoot\" : \"$event\"\n" + " }\n" + "}", "" + @@ -108,9 +111,9 @@ private static void assertPipelineStagesEquals(List stages, Stri private static void assertJsonEquals(String actual, String expected) { try { var mapper = new ObjectMapper(); - actual = mapper.readTree(actual).toPrettyString(); - expected = mapper.readTree(expected).toPrettyString(); - assertThat(actual).isEqualTo(expected); + var actualNode = mapper.readTree(actual); + var expectedNode = mapper.readTree(expected); + assertThat(actualNode).isEqualTo(expectedNode); } catch (IOException e) { throw new RuntimeException(e);