DBZ-3219 Fix column include/exclude support for Oracle
This commit is contained in:
parent
c4f4b4fa80
commit
8a7276cebc
@ -40,7 +40,7 @@
|
||||
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.relational.Tables.ColumnNameFilter;
|
||||
import io.debezium.relational.Tables.TableFilter;
|
||||
import io.debezium.relational.history.HistoryRecordComparator;
|
||||
import io.debezium.util.Strings;
|
||||
@ -344,6 +344,7 @@ public static ConfigDef configDef() {
|
||||
private final String pdbName;
|
||||
private final String xoutServerName;
|
||||
private final SnapshotMode snapshotMode;
|
||||
private final ColumnNameFilter columnFilter;
|
||||
|
||||
private final Boolean tablenameCaseInsensitive;
|
||||
private final OracleVersion oracleVersion;
|
||||
@ -382,6 +383,13 @@ public OracleConnectorConfig(Configuration config) {
|
||||
this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true);
|
||||
this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN);
|
||||
|
||||
if (columnIncludeList() != null) {
|
||||
this.columnFilter = getColumnIncludeNameFilter(columnIncludeList());
|
||||
}
|
||||
else {
|
||||
this.columnFilter = getColumnExcludeNameFilter(columnExcludeList());
|
||||
}
|
||||
|
||||
// LogMiner
|
||||
this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER));
|
||||
this.logMiningStrategy = LogMiningStrategy.parse(config.getString(LOG_MINING_STRATEGY));
|
||||
@ -412,15 +420,28 @@ private static HistoryRecorder resolveLogMiningHistoryRecorder(Configuration con
|
||||
return config.getInstance(LOG_MINING_HISTORY_RECORDER_CLASS, HistoryRecorder.class);
|
||||
}
|
||||
|
||||
protected Tables.ColumnNameFilter getColumnNameFilter(String excludedColumnPatterns) {
|
||||
return new Tables.ColumnNameFilter() {
|
||||
private static ColumnNameFilter getColumnExcludeNameFilter(String excludedColumnPatterns) {
|
||||
return new ColumnNameFilter() {
|
||||
|
||||
Predicate<ColumnId> delegate = Predicates.excludes(excludedColumnPatterns, ColumnId::toString);
|
||||
|
||||
@Override
|
||||
public boolean matches(String catalogName, String schemaName, String tableName, String columnName) {
|
||||
// ignore database name and schema name, we are supposed to capture from one database and one schema
|
||||
return delegate.test(new ColumnId(new TableId(null, null, tableName), columnName));
|
||||
// ignore database name as it's not relevant here
|
||||
return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static ColumnNameFilter getColumnIncludeNameFilter(String includedColumnPatterns) {
|
||||
return new ColumnNameFilter() {
|
||||
|
||||
Predicate<ColumnId> delegate = Predicates.includes(includedColumnPatterns, ColumnId::toString);
|
||||
|
||||
@Override
|
||||
public boolean matches(String catalogName, String schemaName, String tableName, String columnName) {
|
||||
// ignore database name as it's not relevant here
|
||||
return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName));
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -460,6 +481,10 @@ public OracleVersion getOracleVersion() {
|
||||
return oracleVersion;
|
||||
}
|
||||
|
||||
public ColumnNameFilter getColumnFilter() {
|
||||
return columnFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HistoryRecordComparator getHistoryRecordComparator() {
|
||||
return new HistoryRecordComparator() {
|
||||
|
@ -32,7 +32,7 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
|
||||
|
||||
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector,
|
||||
OracleConnection connection) {
|
||||
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
|
||||
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
|
||||
new TableSchemaBuilder(
|
||||
new OracleValueConverters(connectorConfig, connection),
|
||||
schemaNameAdjuster,
|
||||
|
@ -169,7 +169,7 @@ public void shouldApplySchemaAndTableExcludeListConfiguration() throws Exception
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-3167")
|
||||
@FixFor({ "DBZ-3167", "DBZ-3219" })
|
||||
public void shouldApplyColumnIncludeListConfiguration() throws Exception {
|
||||
TestHelper.dropTable(connection, "table4");
|
||||
try {
|
||||
@ -208,7 +208,7 @@ public void shouldApplyColumnIncludeListConfiguration() throws Exception {
|
||||
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
|
||||
assertThat(after.get("ID")).isEqualTo(1);
|
||||
assertThat(after.get("NAME")).isEqualTo("Text-1");
|
||||
assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull();
|
||||
assertThat(after.schema().field("BIRTH_DATE")).isNull();
|
||||
|
||||
// Start streaming & wait for it
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
@ -225,7 +225,7 @@ public void shouldApplyColumnIncludeListConfiguration() throws Exception {
|
||||
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
|
||||
assertThat(after.get("ID")).isEqualTo(2);
|
||||
assertThat(after.get("NAME")).isEqualTo("Text-2");
|
||||
assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull();
|
||||
assertThat(after.schema().field("BIRTH_DATE")).isNull();
|
||||
}
|
||||
finally {
|
||||
TestHelper.dropTable(connection, "table4");
|
||||
@ -233,7 +233,7 @@ public void shouldApplyColumnIncludeListConfiguration() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-3167")
|
||||
@FixFor({ "DBZ-3167", "DBZ-3219" })
|
||||
public void shouldApplyColumnExcludeListConfiguration() throws Exception {
|
||||
TestHelper.dropTable(connection, "table4");
|
||||
try {
|
||||
@ -272,7 +272,7 @@ public void shouldApplyColumnExcludeListConfiguration() throws Exception {
|
||||
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
|
||||
assertThat(after.get("ID")).isEqualTo(1);
|
||||
assertThat(after.get("NAME")).isEqualTo("Text-1");
|
||||
assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull();
|
||||
assertThat(after.schema().field("BIRTH_DATE")).isNull();
|
||||
|
||||
// Start streaming & wait for it
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
@ -289,7 +289,7 @@ public void shouldApplyColumnExcludeListConfiguration() throws Exception {
|
||||
after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
|
||||
assertThat(after.get("ID")).isEqualTo(2);
|
||||
assertThat(after.get("NAME")).isEqualTo("Text-2");
|
||||
assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull();
|
||||
assertThat(after.schema().field("BIRTH_DATE")).isNull();
|
||||
}
|
||||
finally {
|
||||
TestHelper.dropTable(connection, "table4");
|
||||
|
Loading…
Reference in New Issue
Block a user