diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index f59e33bcd..7e926f5cd 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -140,6 +140,7 @@ Sanjay Kr Singh Sanne Grinovero Satyajit Vegesna Saulius Valatka +Sayed Mohammad Hossein Torabi Scofield Xu Sean Rooney Sherafudheen PM diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index abb565eae..04ee2fb3b 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -12,6 +12,7 @@ import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -32,6 +33,7 @@ import com.mongodb.client.model.Filters; import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary; +import io.debezium.data.Envelope.Operation; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; @@ -167,19 +169,7 @@ private void readOplog(MongoClient primary, MongoPrimary primaryClient, ReplicaS oplogContext.setIncompleteTxOrder(txOrder.getAsLong()); } - Bson operationFilter = null; - String operations = connectionContext.config.getString(MongoDbConnectorConfig.SKIPPED_OPERATIONS); - if (operations != null) { - for (String operation : operations.trim().split(",")) { - operation = operation.trim(); - - if (operationFilter == null) { - operationFilter = Filters.ne("op", operation); - } - operationFilter = Filters.or(operationFilter, Filters.ne("op", operation)); - } - } - + Bson operationFilter = getSkippedOperationsFilter(); if (operationFilter != null) { filter = Filters.and(filter, operationFilter); } @@ -225,6 +215,29 @@ private void readOplog(MongoClient primary, MongoPrimary primaryClient, ReplicaS } } + private Bson getSkippedOperationsFilter() { + Set skippedOperations = taskContext.getConnectorConfig().getSkippedOps(); + + if (skippedOperations.isEmpty()) { + return null; + } + + Bson skippedOperationsFilter = null; + + for (Operation operation : skippedOperations) { + Bson skippedOperationFilter = Filters.ne("op", operation.code()); + + if (skippedOperationsFilter == null) { + skippedOperationsFilter = skippedOperationFilter; + } + else { + skippedOperationsFilter = Filters.or(skippedOperationsFilter, skippedOperationFilter); + } + } + + return skippedOperationsFilter; + } + private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext) { String ns = event.getString("ns"); Document object = event.get(OBJECT_FIELD, Document.class); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index 95af9609a..dfba8d781 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -360,6 +360,7 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO } @Test + @FixFor("DBZ-1831") public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws InterruptedException, IOException { // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() @@ -400,6 +401,9 @@ public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws Int Testing.debug("Document ID: " + id.get()); }); + SourceRecords insert = consumeRecordsByTopic(1); + assertThat(insert.recordsForTopic("mongo.dbit.arbitrary")).hasSize(1); + primary().execute("update", mongo -> { MongoDatabase db1 = mongo.getDatabase("dbit"); MongoCollection coll = db1.getCollection("arbitrary"); @@ -415,11 +419,28 @@ public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws Int Testing.debug("Document: " + doc); }); - // Wait until we can consume the only 1 insert - SourceRecords insertAndUpdate = consumeRecordsByTopic(1); - assertThat(insertAndUpdate.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(1); - assertThat(insertAndUpdate.topics().size()).isEqualTo(1); + primary().execute("delete", mongo -> { + MongoDatabase db1 = mongo.getDatabase("dbit"); + MongoCollection coll = db1.getCollection("arbitrary"); + // Find the document ... + Document doc = coll.find().first(); + Testing.debug("Document: " + doc); + Document filter = Document.parse("{\"a\": 1}"); + + // delete + coll.deleteOne(filter); + + doc = coll.find().first(); + Testing.debug("Document: " + doc); + }); + + // Next should be the delete but not the skipped update + SourceRecords delete = consumeRecordsByTopic(1); + assertThat(delete.recordsForTopic("mongo.dbit.arbitrary")).hasSize(1); + SourceRecord deleteRecord = delete.allRecordsInOrder().get(0); + validate(deleteRecord); + verifyDeleteOperation(deleteRecord); } @Test diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 2073a39ee..261fee6da 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -7,8 +7,11 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -19,6 +22,8 @@ import io.debezium.config.Field.ValidationOutput; import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.data.Envelope; +import io.debezium.data.Envelope.Operation; import io.debezium.heartbeat.Heartbeat; import io.debezium.relational.CustomConverterRegistry; import io.debezium.relational.history.KafkaDatabaseHistory; @@ -265,7 +270,7 @@ public static EventProcessingFailureHandlingMode parse(String value) { .withImportance(Importance.LOW) .withValidation(CommonConnectorConfig::validateSkippedOperation) .withDescription("The comma-separated list of operations to skip during streaming, defined as: 'i' for inserts; 'u' for updates; 'd' for deletes. " - + "By default, no operations will be skipped."); + + "By default, no operations will be skipped."); private final Configuration config; private final boolean emitTombstoneOnDelete; @@ -376,6 +381,20 @@ public boolean getSanitizeFieldNames() { return sanitizeFieldNames; } + public Set getSkippedOps() { + String operations = config.getString(SKIPPED_OPERATIONS); + + if (operations != null) { + return Arrays.stream(operations.split(",")) + .map(String::trim) + .map(Operation::forCode) + .collect(Collectors.toSet()); + } + else { + return Collections.emptySet(); + } + } + private static int validateMaxQueueSize(Configuration config, Field field, Field.ValidationOutput problems) { int maxQueueSize = config.getInteger(field); int maxBatchSize = config.getInteger(MAX_BATCH_SIZE); @@ -398,8 +417,9 @@ private static int validateSkippedOperation(Configuration config, Field field, V return 0; } - for (String operation : operations.trim().split(",")) { + for (String operation : operations.split(",")) { switch (operation.trim()) { + case "r": case "i": case "u": case "d":