From 2c2b87b846951e35c724707a63ebef427af0a19c Mon Sep 17 00:00:00 2001 From: Ronak Jain Date: Sun, 16 Apr 2023 00:22:30 +0530 Subject: [PATCH] DBZ-2979 If column.include.list/column.exclude.list are used and the target table receives an update for the excluded (or not included) column - such events should be ignored * [Unstable][Untested] Add config in Oracle Connector Addresses/Closes DBZ-2979 --- .../oracle/BaseChangeRecordEmitter.java | 5 + ...acleSkipMessagesWithoutChangeConfigIT.java | 182 ++++++++++++++++++ .../modules/ROOT/pages/connectors/oracle.adoc | 4 + 3 files changed, 191 insertions(+) create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java index fb6ff911b..7a8670c30 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/BaseChangeRecordEmitter.java @@ -53,6 +53,11 @@ protected BaseChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partiti this.table = table; } + @Override + protected boolean skipMessagesWithoutChange() { + return connectorConfig.skipMessagesWithoutChange(); + } + @Override protected Object[] getOldColumnValues() { return oldColumnValues; diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java new file mode 100644 index 000000000..8bfb05bc6 --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSkipMessagesWithoutChangeConfigIT.java @@ -0,0 +1,182 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.data.Envelope; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; + +/** + * Integration tests for config skip.messages.without.change + * + * @author Ronak Jain + */ +public class OracleSkipMessagesWithoutChangeConfigIT extends AbstractConnectorTest { + + @Rule + public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule(); + + private OracleConnection connection; + + @Before + public void before() { + connection = TestHelper.testConnection(); + TestHelper.dropTable(connection, "debezium.test"); + setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); + initializeConnectorTestFramework(); + Files.delete(TestHelper.SCHEMA_HISTORY_PATH); + } + + @After + public void after() throws Exception { + if (connection != null) { + TestHelper.dropTable(connection, "debezium.test"); + connection.close(); + } + } + + @Test + @FixFor("DBZ-2979") + public void shouldSkipEventsWithNoChangeInWhitelistedColumnsWhenSkipEnabled() throws Exception { + String ddl = "CREATE TABLE debezium.test (" + + " id INT NOT NULL, white INT, black INT PRIMARY KEY (id))"; + + connection.execute(ddl); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium\\.test") + .with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) + .with(OracleConnectorConfig.COLUMN_INCLUDE_LIST, "debezium\\.test\\.id, debezium\\.test\\.white") + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .build(); + + start(OracleConnector.class, config); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.test VALUES (1, 1, 1)"); + connection.execute("UPDATE debezium.test SET black=2 where id = 1"); + connection.execute("UPDATE debezium.test SET white=2 where id = 1"); + connection.execute("UPDATE debezium.test SET white=3, black=3 where id = 1"); + + /* + * Total Events + * 1,1,1 (I) + * 1,1,2 (U) (Skipped) + * 1,2,2 (U) + * 1,3,3 (U) + */ + SourceRecords records = consumeRecordsByTopic(5); + List recordsForTopic = records.recordsForTopic(topicName("test")); + assertThat(recordsForTopic).hasSize(3); + + Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); + assertThat(secondMessage.get("white")).isEqualTo(2); + Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER); + assertThat(thirdMessage.get("white")).isEqualTo(3); + } + + @Test + @FixFor("DBZ-2979") + public void shouldSkipEventsWithNoChangeInWhitelistedColumnsWhenSkipEnabledWithExcludeConfig() throws Exception { + String ddl = "CREATE TABLE debezium.test (" + + " id INT NOT NULL, white INT, black INT PRIMARY KEY (id))"; + + connection.execute(ddl); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium\\.test") + .with(OracleConnectorConfig.COLUMN_EXCLUDE_LIST, "debezium\\.test\\.black") + .with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true) + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .build(); + + start(OracleConnector.class, config); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.test VALUES (1, 1, 1)"); + connection.execute("UPDATE debezium.test SET black=2 where id = 1"); + connection.execute("UPDATE debezium.test SET white=2 where id = 1"); + connection.execute("UPDATE debezium.test SET white=3, black=3 where id = 1"); + + /* + * Total Events + * 1,1,1 (I) + * 1,1,2 (U) (Skipped) + * 1,2,2 (U) + * 1,3,3 (U) + */ + SourceRecords records = consumeRecordsByTopic(5); + List recordsForTopic = records.recordsForTopic(topicName("test")); + assertThat(recordsForTopic).hasSize(3); + + Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); + assertThat(secondMessage.get("white")).isEqualTo(2); + Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER); + assertThat(thirdMessage.get("white")).isEqualTo(3); + } + + @Test + @FixFor("DBZ-2979") + public void shouldNotSkipEventsWithNoChangeInWhitelistedColumnsWhenSkipDisabled() throws Exception { + String ddl = "CREATE TABLE debezium.test (" + + " id INT NOT NULL, white INT, black INT PRIMARY KEY (id))"; + + connection.execute(ddl); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium\\.test") + .with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, false) + .with(OracleConnectorConfig.COLUMN_INCLUDE_LIST, "debezium\\.test\\.id, debezium\\.test\\.white") + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .build(); + + start(OracleConnector.class, config); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO debezium.test VALUES (1, 1, 1)"); + connection.execute("UPDATE debezium.test SET black=2 where id = 1"); + connection.execute("UPDATE debezium.test SET white=2 where id = 1"); + connection.execute("UPDATE debezium.test SET white=3, black=3 where id = 1"); + + /* + * Total Events + * 1,1,1 (I) + * 1,1,2 (U) + * 1,2,2 (U) + * 1,3,3 (U) + */ + SourceRecords records = consumeRecordsByTopic(5); + List recordsForTopic = records.recordsForTopic(topicName("test")); + assertThat(recordsForTopic).hasSize(3); + + Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER); + assertThat(secondMessage.get("white")).isEqualTo(1); + Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER); + assertThat(thirdMessage.get("white")).isEqualTo(2); + Struct forthMessage = ((Struct) recordsForTopic.get(3).value()).getStruct(Envelope.FieldName.AFTER); + assertThat(forthMessage.get("white")).isEqualTo(3); + } + + private static String topicName(String tableName) { + return TestHelper.SERVER_NAME + ".DEBEZIUM." + tableName; + } +} diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index 466ebde0a..2b9134a54 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -2846,6 +2846,10 @@ To match the name of a column, {prodname} applies the regular expression that yo That is, the specified expression is matched against the entire name string of the column it does not match substrings that might be present in a column name. + If you include this property in the configuration, do not set the `column.include.list` property. +|[[oracle-property-skip-messages-without-change]]<> +|`false` +| Specifies whether to skip publishing messages when there is no change in whitelisted columns. This would essentially filter messages if there is no change in columns included as per `column.include.list` or `column.exclude.list` properties. + |[[oracle-property-column-mask-hash]]<>; [[oracle-property-column-mask-hash-v2]]<> |_n/a_