From 8a7276cebca7909ed99d4c67f32f8460e1508e1d Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 4 Mar 2021 11:43:35 -0500 Subject: [PATCH] DBZ-3219 Fix column include/exclude support for Oracle --- .../oracle/OracleConnectorConfig.java | 35 ++++++++++++++++--- .../oracle/OracleDatabaseSchema.java | 2 +- .../oracle/OracleConnectorFilterIT.java | 12 +++---- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 9148f35e9..c63140d76 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -40,7 +40,7 @@ import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; -import io.debezium.relational.Tables; +import io.debezium.relational.Tables.ColumnNameFilter; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.util.Strings; @@ -344,6 +344,7 @@ public static ConfigDef configDef() { private final String pdbName; private final String xoutServerName; private final SnapshotMode snapshotMode; + private final ColumnNameFilter columnFilter; private final Boolean tablenameCaseInsensitive; private final OracleVersion oracleVersion; @@ -382,6 +383,13 @@ public OracleConnectorConfig(Configuration config) { this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true); this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN); + if (columnIncludeList() != null) { + this.columnFilter = getColumnIncludeNameFilter(columnIncludeList()); + } + else { + this.columnFilter = getColumnExcludeNameFilter(columnExcludeList()); + } + // LogMiner this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)); this.logMiningStrategy = LogMiningStrategy.parse(config.getString(LOG_MINING_STRATEGY)); @@ -412,15 +420,28 @@ private static HistoryRecorder resolveLogMiningHistoryRecorder(Configuration con return config.getInstance(LOG_MINING_HISTORY_RECORDER_CLASS, HistoryRecorder.class); } - protected Tables.ColumnNameFilter getColumnNameFilter(String excludedColumnPatterns) { - return new Tables.ColumnNameFilter() { + private static ColumnNameFilter getColumnExcludeNameFilter(String excludedColumnPatterns) { + return new ColumnNameFilter() { Predicate delegate = Predicates.excludes(excludedColumnPatterns, ColumnId::toString); @Override public boolean matches(String catalogName, String schemaName, String tableName, String columnName) { - // ignore database name and schema name, we are supposed to capture from one database and one schema - return delegate.test(new ColumnId(new TableId(null, null, tableName), columnName)); + // ignore database name as it's not relevant here + return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName)); + } + }; + } + + private static ColumnNameFilter getColumnIncludeNameFilter(String includedColumnPatterns) { + return new ColumnNameFilter() { + + Predicate delegate = Predicates.includes(includedColumnPatterns, ColumnId::toString); + + @Override + public boolean matches(String catalogName, String schemaName, String tableName, String columnName) { + // ignore database name as it's not relevant here + return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName)); } }; } @@ -460,6 +481,10 @@ public OracleVersion getOracleVersion() { return oracleVersion; } + public ColumnNameFilter getColumnFilter() { + return columnFilter; + } + @Override protected HistoryRecordComparator getHistoryRecordComparator() { return new HistoryRecordComparator() { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java index 8f4c43cae..9b295177b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java @@ -32,7 +32,7 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema { public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, OracleConnection connection) { - super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, + super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(), new TableSchemaBuilder( new OracleValueConverters(connectorConfig, connection), schemaNameAdjuster, diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java index d56f2031d..63dc7fb92 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java @@ -169,7 +169,7 @@ public void shouldApplySchemaAndTableExcludeListConfiguration() throws Exception } @Test - @FixFor("DBZ-3167") + @FixFor({ "DBZ-3167", "DBZ-3219" }) public void shouldApplyColumnIncludeListConfiguration() throws Exception { TestHelper.dropTable(connection, "table4"); try { @@ -208,7 +208,7 @@ public void shouldApplyColumnIncludeListConfiguration() throws Exception { Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); assertThat(after.get("ID")).isEqualTo(1); assertThat(after.get("NAME")).isEqualTo("Text-1"); - assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull(); + assertThat(after.schema().field("BIRTH_DATE")).isNull(); // Start streaming & wait for it waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); @@ -225,7 +225,7 @@ public void shouldApplyColumnIncludeListConfiguration() throws Exception { after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); assertThat(after.get("ID")).isEqualTo(2); assertThat(after.get("NAME")).isEqualTo("Text-2"); - assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull(); + assertThat(after.schema().field("BIRTH_DATE")).isNull(); } finally { TestHelper.dropTable(connection, "table4"); @@ -233,7 +233,7 @@ public void shouldApplyColumnIncludeListConfiguration() throws Exception { } @Test - @FixFor("DBZ-3167") + @FixFor({ "DBZ-3167", "DBZ-3219" }) public void shouldApplyColumnExcludeListConfiguration() throws Exception { TestHelper.dropTable(connection, "table4"); try { @@ -272,7 +272,7 @@ public void shouldApplyColumnExcludeListConfiguration() throws Exception { Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); assertThat(after.get("ID")).isEqualTo(1); assertThat(after.get("NAME")).isEqualTo("Text-1"); - assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull(); + assertThat(after.schema().field("BIRTH_DATE")).isNull(); // Start streaming & wait for it waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); @@ -289,7 +289,7 @@ public void shouldApplyColumnExcludeListConfiguration() throws Exception { after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after"); assertThat(after.get("ID")).isEqualTo(2); assertThat(after.get("NAME")).isEqualTo("Text-2"); - assertThat(testTableRecords.get(0).valueSchema().field("BIRTH_DATE")).isNull(); + assertThat(after.schema().field("BIRTH_DATE")).isNull(); } finally { TestHelper.dropTable(connection, "table4");