DBZ-1831 Pulling up connector option retrieval to CommonConnectorConfig;

* Expanding test
* Adding Sayed to COPYRIGHT.txt
This commit is contained in:
Gunnar Morling 2020-03-11 11:55:26 +01:00 committed by Jiri Pechanec
parent 63a462dbc0
commit 6318cf0f2b
4 changed files with 74 additions and 19 deletions

View File

@ -140,6 +140,7 @@ Sanjay Kr Singh
Sanne Grinovero
Satyajit Vegesna
Saulius Valatka
Sayed Mohammad Hossein Torabi
Scofield Xu
Sean Rooney
Sherafudheen PM

View File

@ -12,6 +12,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@ -32,6 +33,7 @@
import com.mongodb.client.model.Filters;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
@ -167,19 +169,7 @@ private void readOplog(MongoClient primary, MongoPrimary primaryClient, ReplicaS
oplogContext.setIncompleteTxOrder(txOrder.getAsLong());
}
Bson operationFilter = null;
String operations = connectionContext.config.getString(MongoDbConnectorConfig.SKIPPED_OPERATIONS);
if (operations != null) {
for (String operation : operations.trim().split(",")) {
operation = operation.trim();
if (operationFilter == null) {
operationFilter = Filters.ne("op", operation);
}
operationFilter = Filters.or(operationFilter, Filters.ne("op", operation));
}
}
Bson operationFilter = getSkippedOperationsFilter();
if (operationFilter != null) {
filter = Filters.and(filter, operationFilter);
}
@ -225,6 +215,29 @@ private void readOplog(MongoClient primary, MongoPrimary primaryClient, ReplicaS
}
}
private Bson getSkippedOperationsFilter() {
Set<Operation> skippedOperations = taskContext.getConnectorConfig().getSkippedOps();
if (skippedOperations.isEmpty()) {
return null;
}
Bson skippedOperationsFilter = null;
for (Operation operation : skippedOperations) {
Bson skippedOperationFilter = Filters.ne("op", operation.code());
if (skippedOperationsFilter == null) {
skippedOperationsFilter = skippedOperationFilter;
}
else {
skippedOperationsFilter = Filters.or(skippedOperationsFilter, skippedOperationFilter);
}
}
return skippedOperationsFilter;
}
private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext) {
String ns = event.getString("ns");
Document object = event.get(OBJECT_FIELD, Document.class);

View File

@ -360,6 +360,7 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO
}
@Test
@FixFor("DBZ-1831")
public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws InterruptedException, IOException {
// Use the DB configuration to define the connector's configuration ...
config = TestHelper.getConfiguration().edit()
@ -400,6 +401,9 @@ public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws Int
Testing.debug("Document ID: " + id.get());
});
SourceRecords insert = consumeRecordsByTopic(1);
assertThat(insert.recordsForTopic("mongo.dbit.arbitrary")).hasSize(1);
primary().execute("update", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection("arbitrary");
@ -415,11 +419,28 @@ public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws Int
Testing.debug("Document: " + doc);
});
// Wait until we can consume the only 1 insert
SourceRecords insertAndUpdate = consumeRecordsByTopic(1);
assertThat(insertAndUpdate.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(1);
assertThat(insertAndUpdate.topics().size()).isEqualTo(1);
primary().execute("delete", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection("arbitrary");
// Find the document ...
Document doc = coll.find().first();
Testing.debug("Document: " + doc);
Document filter = Document.parse("{\"a\": 1}");
// delete
coll.deleteOne(filter);
doc = coll.find().first();
Testing.debug("Document: " + doc);
});
// Next should be the delete but not the skipped update
SourceRecords delete = consumeRecordsByTopic(1);
assertThat(delete.recordsForTopic("mongo.dbit.arbitrary")).hasSize(1);
SourceRecord deleteRecord = delete.allRecordsInOrder().get(0);
validate(deleteRecord);
verifyDeleteOperation(deleteRecord);
}
@Test

View File

@ -7,8 +7,11 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -19,6 +22,8 @@
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.history.KafkaDatabaseHistory;
@ -265,7 +270,7 @@ public static EventProcessingFailureHandlingMode parse(String value) {
.withImportance(Importance.LOW)
.withValidation(CommonConnectorConfig::validateSkippedOperation)
.withDescription("The comma-separated list of operations to skip during streaming, defined as: 'i' for inserts; 'u' for updates; 'd' for deletes. "
+ "By default, no operations will be skipped.");
+ "By default, no operations will be skipped.");
private final Configuration config;
private final boolean emitTombstoneOnDelete;
@ -376,6 +381,20 @@ public boolean getSanitizeFieldNames() {
return sanitizeFieldNames;
}
public Set<Envelope.Operation> getSkippedOps() {
String operations = config.getString(SKIPPED_OPERATIONS);
if (operations != null) {
return Arrays.stream(operations.split(","))
.map(String::trim)
.map(Operation::forCode)
.collect(Collectors.toSet());
}
else {
return Collections.emptySet();
}
}
private static int validateMaxQueueSize(Configuration config, Field field, Field.ValidationOutput problems) {
int maxQueueSize = config.getInteger(field);
int maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
@ -398,8 +417,9 @@ private static int validateSkippedOperation(Configuration config, Field field, V
return 0;
}
for (String operation : operations.trim().split(",")) {
for (String operation : operations.split(",")) {
switch (operation.trim()) {
case "r":
case "i":
case "u":
case "d":