DBZ-2979 If column.include.list/column.exclude.list are used and the target table receives an update for the excluded (or not included) column - such events should be ignored

* [Unstable][Untested] Add config in Oracle Connector

Addresses/Closes DBZ-2979
This commit is contained in:
Ronak Jain 2023-04-16 00:22:30 +05:30 committed by Jiri Pechanec
parent 465b355dd0
commit 2c2b87b846
3 changed files with 191 additions and 0 deletions

View File

@ -53,6 +53,11 @@ protected BaseChangeRecordEmitter(OracleConnectorConfig connectorConfig, Partiti
this.table = table; this.table = table;
} }
@Override
protected boolean skipMessagesWithoutChange() {
return connectorConfig.skipMessagesWithoutChange();
}
@Override @Override
protected Object[] getOldColumnValues() { protected Object[] getOldColumnValues() {
return oldColumnValues; return oldColumnValues;

View File

@ -0,0 +1,182 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
/**
* Integration tests for config skip.messages.without.change
*
* @author Ronak Jain
*/
public class OracleSkipMessagesWithoutChangeConfigIT extends AbstractConnectorTest {
@Rule
public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
private OracleConnection connection;
@Before
public void before() {
connection = TestHelper.testConnection();
TestHelper.dropTable(connection, "debezium.test");
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
}
@After
public void after() throws Exception {
if (connection != null) {
TestHelper.dropTable(connection, "debezium.test");
connection.close();
}
}
@Test
@FixFor("DBZ-2979")
public void shouldSkipEventsWithNoChangeInWhitelistedColumnsWhenSkipEnabled() throws Exception {
String ddl = "CREATE TABLE debezium.test (" +
" id INT NOT NULL, white INT, black INT PRIMARY KEY (id))";
connection.execute(ddl);
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium\\.test")
.with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true)
.with(OracleConnectorConfig.COLUMN_INCLUDE_LIST, "debezium\\.test\\.id, debezium\\.test\\.white")
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.test VALUES (1, 1, 1)");
connection.execute("UPDATE debezium.test SET black=2 where id = 1");
connection.execute("UPDATE debezium.test SET white=2 where id = 1");
connection.execute("UPDATE debezium.test SET white=3, black=3 where id = 1");
/*
* Total Events
* 1,1,1 (I)
* 1,1,2 (U) (Skipped)
* 1,2,2 (U)
* 1,3,3 (U)
*/
SourceRecords records = consumeRecordsByTopic(5);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("test"));
assertThat(recordsForTopic).hasSize(3);
Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(secondMessage.get("white")).isEqualTo(2);
Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(thirdMessage.get("white")).isEqualTo(3);
}
@Test
@FixFor("DBZ-2979")
public void shouldSkipEventsWithNoChangeInWhitelistedColumnsWhenSkipEnabledWithExcludeConfig() throws Exception {
String ddl = "CREATE TABLE debezium.test (" +
" id INT NOT NULL, white INT, black INT PRIMARY KEY (id))";
connection.execute(ddl);
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium\\.test")
.with(OracleConnectorConfig.COLUMN_EXCLUDE_LIST, "debezium\\.test\\.black")
.with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true)
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.test VALUES (1, 1, 1)");
connection.execute("UPDATE debezium.test SET black=2 where id = 1");
connection.execute("UPDATE debezium.test SET white=2 where id = 1");
connection.execute("UPDATE debezium.test SET white=3, black=3 where id = 1");
/*
* Total Events
* 1,1,1 (I)
* 1,1,2 (U) (Skipped)
* 1,2,2 (U)
* 1,3,3 (U)
*/
SourceRecords records = consumeRecordsByTopic(5);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("test"));
assertThat(recordsForTopic).hasSize(3);
Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(secondMessage.get("white")).isEqualTo(2);
Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(thirdMessage.get("white")).isEqualTo(3);
}
@Test
@FixFor("DBZ-2979")
public void shouldNotSkipEventsWithNoChangeInWhitelistedColumnsWhenSkipDisabled() throws Exception {
String ddl = "CREATE TABLE debezium.test (" +
" id INT NOT NULL, white INT, black INT PRIMARY KEY (id))";
connection.execute(ddl);
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium\\.test")
.with(OracleConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, false)
.with(OracleConnectorConfig.COLUMN_INCLUDE_LIST, "debezium\\.test\\.id, debezium\\.test\\.white")
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.build();
start(OracleConnector.class, config);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.test VALUES (1, 1, 1)");
connection.execute("UPDATE debezium.test SET black=2 where id = 1");
connection.execute("UPDATE debezium.test SET white=2 where id = 1");
connection.execute("UPDATE debezium.test SET white=3, black=3 where id = 1");
/*
* Total Events
* 1,1,1 (I)
* 1,1,2 (U)
* 1,2,2 (U)
* 1,3,3 (U)
*/
SourceRecords records = consumeRecordsByTopic(5);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("test"));
assertThat(recordsForTopic).hasSize(3);
Struct secondMessage = ((Struct) recordsForTopic.get(1).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(secondMessage.get("white")).isEqualTo(1);
Struct thirdMessage = ((Struct) recordsForTopic.get(2).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(thirdMessage.get("white")).isEqualTo(2);
Struct forthMessage = ((Struct) recordsForTopic.get(3).value()).getStruct(Envelope.FieldName.AFTER);
assertThat(forthMessage.get("white")).isEqualTo(3);
}
private static String topicName(String tableName) {
return TestHelper.SERVER_NAME + ".DEBEZIUM." + tableName;
}
}

View File

@ -2846,6 +2846,10 @@ To match the name of a column, {prodname} applies the regular expression that yo
That is, the specified expression is matched against the entire name string of the column it does not match substrings that might be present in a column name. + That is, the specified expression is matched against the entire name string of the column it does not match substrings that might be present in a column name. +
If you include this property in the configuration, do not set the `column.include.list` property. If you include this property in the configuration, do not set the `column.include.list` property.
|[[oracle-property-skip-messages-without-change]]<<oracle-property-skip-messages-without-change, `+skip.messages.without.change+`>>
|`false`
| Specifies whether to skip publishing messages when there is no change in whitelisted columns. This would essentially filter messages if there is no change in columns included as per `column.include.list` or `column.exclude.list` properties.
|[[oracle-property-column-mask-hash]]<<oracle-property-column-mask-hash, `column.mask.hash._hashAlgorithm_.with.salt._salt_`>>; |[[oracle-property-column-mask-hash]]<<oracle-property-column-mask-hash, `column.mask.hash._hashAlgorithm_.with.salt._salt_`>>;
[[oracle-property-column-mask-hash-v2]]<<oracle-property-column-mask-hash-v2, `column.mask.hash.v2._hashAlgorithm_.with.salt._salt_`>> [[oracle-property-column-mask-hash-v2]]<<oracle-property-column-mask-hash-v2, `column.mask.hash.v2._hashAlgorithm_.with.salt._salt_`>>
|_n/a_ |_n/a_