DBZ-5052 Send tombstones only when consumer is able to consume them

Change event consumer may specify if is able to consume tombstones or
not. If the it's not, don't send them. However, connector configuration
takes precence and thus cunsumer capability is taken into account only
when `tombstones.on.delete` is not explicitely configured for the
connector.

Also fix debezium server - setting notifier has to be done once embedded
config already exists, not before it.
This commit is contained in:
Vojtech Juranek 2022-05-25 18:20:19 +02:00 committed by Jiri Pechanec
parent d6447a91c9
commit f8d4307155
3 changed files with 59 additions and 5 deletions

View File

@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -45,6 +46,7 @@
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine.CompletionResult;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.SkipWhenDatabaseVersion;
@ -2481,4 +2483,55 @@ public void testDmlInChangeEvents() throws Exception {
stopConnector();
}
@Test
@FixFor("DBZ-5052")
public void shouldNotSendTombstonesWhenNotSupportedByHandler() throws Exception {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c")
.build();
start(MySqlConnector.class, config, new NoTombStonesHandler(consumedLines));
waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName());
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);");
connection.execute("DELETE FROM products WHERE name = 'rubberduck'");
connection.execute("INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);");
connection.execute("DELETE FROM products WHERE name = 'rubberduck'");
}
}
// INSERT is excluded, DELETE generates delete and tombstone event, but tombstones shouldn't be produced, thus 2 events overall.
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> changeEvents = records.recordsForTopic(DATABASE.topicForTable("products"));
assertDelete(changeEvents.get(0), "id", 201);
assertDelete(changeEvents.get(1), "id", 201);
assertThat(changeEvents.size()).isEqualTo(2);
stopConnector();
}
private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer<SourceRecord> {
protected BlockingQueue<SourceRecord> recordQueue;
public NoTombStonesHandler(BlockingQueue<SourceRecord> recordQueue) {
this.recordQueue = recordQueue;
}
public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
for (SourceRecord r : records) {
recordQueue.offer(r);
committer.markProcessed(r);
}
}
public boolean supportsTombstoneEvents() {
return false;
}
}
}

View File

@ -50,6 +50,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Instantiator;
@ -275,6 +276,10 @@ public Builder notifying(Consumer<SourceRecord> consumer) {
@Override
public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) {
this.handler = handler;
if (!config.hasKey(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !handler.supportsTombstoneEvents()) {
LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
config = config.edit().with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
}
return this;
}

View File

@ -29,7 +29,6 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.ChangeConsumer;
@ -132,17 +131,14 @@ else if (beans.size() > 1) {
props.setProperty("transforms", transforms.get());
configToProperties(config, props, PROP_TRANSFORMS_PREFIX, "transforms.");
}
if (!consumer.supportsTombstoneEvents()) {
props.setProperty(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name(), Boolean.FALSE.toString());
}
props.setProperty("name", name);
LOGGER.debug("Configuration for DebeziumEngine: {}", props);
engine = DebeziumEngine.create(keyFormat, valueFormat)
.notifying(consumer)
.using(props)
.using((DebeziumEngine.ConnectorCallback) health)
.using((DebeziumEngine.CompletionCallback) health)
.notifying(consumer)
.build();
executor.execute(() -> {