diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index 10855e38d..abd898b51 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -55,6 +55,11 @@ public class OracleConnection extends JdbcConnection { */ private static final Pattern ADT_INDEX_NAMES_PATTERN = Pattern.compile("^\".*\"\\.\".*\".*"); + /** + * Pattern to identify a hidden column based on redefining a table with the {@code ROWID} option. + */ + private static final Pattern MROW_PATTERN = Pattern.compile("^M_ROW\\$\\$"); + /** * A field for the raw jdbc url. This field has no default value. */ @@ -228,7 +233,9 @@ public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id @Override protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) { if (columnName != null) { - return !SYS_NC_PATTERN.matcher(columnName).matches() && !ADT_INDEX_NAMES_PATTERN.matcher(columnName).matches(); + return !SYS_NC_PATTERN.matcher(columnName).matches() + && !ADT_INDEX_NAMES_PATTERN.matcher(columnName).matches() + && !MROW_PATTERN.matcher(columnName).matches(); } return false; } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 44da393c3..a9c304d7e 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -4176,6 +4176,30 @@ public void shouldNotDiscardTransactionWhenNoEventThresholdSet() throws Exceptio } } + @Test + @FixFor("DBZ-5356") + public void shouldUniqueIndexWhenAtLeastOneColumnIsExcluded() throws Exception { + TestHelper.dropTable(connection, "dbz5356"); + try { + connection.execute("CREATE TABLE dbz5356 (id numeric(9,0), data varchar2(50))"); + connection.execute("CREATE UNIQUE INDEX dbz5356_idx ON dbz5356 (upper(data), id)"); + TestHelper.streamTable(connection, "dbz5356"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5356") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + stopConnector(); + } + finally { + TestHelper.dropTable(connection, "dbz5356"); + } + } + private void testTableWithForwardSlashes(String tableName, String topicTableName) throws Exception { final String quotedTableName = "\"" + tableName + "\""; TestHelper.dropTable(connection, quotedTableName); diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 189407c0e..97e813933 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -1326,6 +1326,7 @@ public List readPrimaryKeyNames(DatabaseMetaData metadata, TableId id) t public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id) throws SQLException { final List uniqueIndexColumnNames = new ArrayList<>(); + final List excludedIndexNames = new ArrayList<>(); try (ResultSet rs = metadata.getIndexInfo(id.catalog(), id.schema(), id.table(), true, true)) { String firstIndexName = null; while (rs.next()) { @@ -1335,10 +1336,20 @@ public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id if (firstIndexName == null) { firstIndexName = indexName; } - if (!isTableUniqueIndexIncluded(indexName, columnName)) { - continue; + if (indexName != null) { + boolean indexIncluded = isTableUniqueIndexIncluded(indexName, columnName); + if (!indexIncluded && !excludedIndexNames.contains(indexName)) { + excludedIndexNames.add(indexName); + } + if (excludedIndexNames.contains(indexName)) { + // index has been excluded, skip further processing + if (!uniqueIndexColumnNames.isEmpty()) { + uniqueIndexColumnNames.clear(); + } + continue; + } } - // Only first unique index is taken into consideration + // Only first non-excluded unique index is taken into consideration if (indexName != null && !indexName.equals(firstIndexName)) { return uniqueIndexColumnNames; }