From f8d4307155a208aeb151c46f6faa1cfe1e8ab151 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Wed, 25 May 2022 18:20:19 +0200 Subject: [PATCH] DBZ-5052 Send tombstones only when consumer is able to consume them Change event consumer may specify if is able to consume tombstones or not. If the it's not, don't send them. However, connector configuration takes precence and thus cunsumer capability is taken into account only when `tombstones.on.delete` is not explicitely configured for the connector. Also fix debezium server - setting notifier has to be done once embedded config already exists, not before it. --- .../connector/mysql/MySqlConnectorIT.java | 53 +++++++++++++++++++ .../io/debezium/embedded/EmbeddedEngine.java | 5 ++ .../io/debezium/server/DebeziumServer.java | 6 +-- 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 31d345958..b819db557 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -45,6 +46,7 @@ import io.debezium.doc.FixFor; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.EmbeddedEngine.CompletionResult; +import io.debezium.engine.DebeziumEngine; import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.junit.SkipWhenDatabaseVersion; @@ -2481,4 +2483,55 @@ public void testDmlInChangeEvents() throws Exception { stopConnector(); } + + @Test + @FixFor("DBZ-5052") + public void shouldNotSendTombstonesWhenNotSupportedByHandler() throws Exception { + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c") + .build(); + + start(MySqlConnector.class, config, new NoTombStonesHandler(consumedLines)); + waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName()); + + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) { + try (JdbcConnection connection = db.connect()) { + connection.execute("INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);"); + connection.execute("DELETE FROM products WHERE name = 'rubberduck'"); + connection.execute("INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);"); + connection.execute("DELETE FROM products WHERE name = 'rubberduck'"); + } + } + + // INSERT is excluded, DELETE generates delete and tombstone event, but tombstones shouldn't be produced, thus 2 events overall. + SourceRecords records = consumeRecordsByTopic(2); + List changeEvents = records.recordsForTopic(DATABASE.topicForTable("products")); + + assertDelete(changeEvents.get(0), "id", 201); + assertDelete(changeEvents.get(1), "id", 201); + assertThat(changeEvents.size()).isEqualTo(2); + + stopConnector(); + } + + private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer { + protected BlockingQueue recordQueue; + + public NoTombStonesHandler(BlockingQueue recordQueue) { + this.recordQueue = recordQueue; + } + + public void handleBatch(List records, DebeziumEngine.RecordCommitter committer) throws InterruptedException { + for (SourceRecord r : records) { + recordQueue.offer(r); + committer.markProcessed(r); + } + } + + public boolean supportsTombstoneEvents() { + return false; + } + } } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 2eb0c91f2..d71f9985b 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory; import io.debezium.annotation.ThreadSafe; +import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.config.Instantiator; @@ -275,6 +276,10 @@ public Builder notifying(Consumer consumer) { @Override public Builder notifying(DebeziumEngine.ChangeConsumer handler) { this.handler = handler; + if (!config.hasKey(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !handler.supportsTombstoneEvents()) { + LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()); + config = config.edit().with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build(); + } return this; } diff --git a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java index ae3577089..caefef91b 100644 --- a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java +++ b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java @@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; -import io.debezium.config.CommonConnectorConfig; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.ChangeConsumer; @@ -132,17 +131,14 @@ else if (beans.size() > 1) { props.setProperty("transforms", transforms.get()); configToProperties(config, props, PROP_TRANSFORMS_PREFIX, "transforms."); } - if (!consumer.supportsTombstoneEvents()) { - props.setProperty(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name(), Boolean.FALSE.toString()); - } props.setProperty("name", name); LOGGER.debug("Configuration for DebeziumEngine: {}", props); engine = DebeziumEngine.create(keyFormat, valueFormat) - .notifying(consumer) .using(props) .using((DebeziumEngine.ConnectorCallback) health) .using((DebeziumEngine.CompletionCallback) health) + .notifying(consumer) .build(); executor.execute(() -> {