DBZ-5356 Avoid NPE during index evaluation
This commit is contained in:
parent
ddbd682d8d
commit
deaa34d71e
@ -55,6 +55,11 @@ public class OracleConnection extends JdbcConnection {
|
||||
*/
|
||||
private static final Pattern ADT_INDEX_NAMES_PATTERN = Pattern.compile("^\".*\"\\.\".*\".*");
|
||||
|
||||
/**
|
||||
* Pattern to identify a hidden column based on redefining a table with the {@code ROWID} option.
|
||||
*/
|
||||
private static final Pattern MROW_PATTERN = Pattern.compile("^M_ROW\\$\\$");
|
||||
|
||||
/**
|
||||
* A field for the raw jdbc url. This field has no default value.
|
||||
*/
|
||||
@ -228,7 +233,9 @@ public List<String> readTableUniqueIndices(DatabaseMetaData metadata, TableId id
|
||||
@Override
|
||||
protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
|
||||
if (columnName != null) {
|
||||
return !SYS_NC_PATTERN.matcher(columnName).matches() && !ADT_INDEX_NAMES_PATTERN.matcher(columnName).matches();
|
||||
return !SYS_NC_PATTERN.matcher(columnName).matches()
|
||||
&& !ADT_INDEX_NAMES_PATTERN.matcher(columnName).matches()
|
||||
&& !MROW_PATTERN.matcher(columnName).matches();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
@ -4176,6 +4176,30 @@ public void shouldNotDiscardTransactionWhenNoEventThresholdSet() throws Exceptio
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-5356")
|
||||
public void shouldUniqueIndexWhenAtLeastOneColumnIsExcluded() throws Exception {
|
||||
TestHelper.dropTable(connection, "dbz5356");
|
||||
try {
|
||||
connection.execute("CREATE TABLE dbz5356 (id numeric(9,0), data varchar2(50))");
|
||||
connection.execute("CREATE UNIQUE INDEX dbz5356_idx ON dbz5356 (upper(data), id)");
|
||||
TestHelper.streamTable(connection, "dbz5356");
|
||||
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5356")
|
||||
.build();
|
||||
|
||||
start(OracleConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
stopConnector();
|
||||
}
|
||||
finally {
|
||||
TestHelper.dropTable(connection, "dbz5356");
|
||||
}
|
||||
}
|
||||
|
||||
private void testTableWithForwardSlashes(String tableName, String topicTableName) throws Exception {
|
||||
final String quotedTableName = "\"" + tableName + "\"";
|
||||
TestHelper.dropTable(connection, quotedTableName);
|
||||
|
@ -1326,6 +1326,7 @@ public List<String> readPrimaryKeyNames(DatabaseMetaData metadata, TableId id) t
|
||||
|
||||
public List<String> readTableUniqueIndices(DatabaseMetaData metadata, TableId id) throws SQLException {
|
||||
final List<String> uniqueIndexColumnNames = new ArrayList<>();
|
||||
final List<String> excludedIndexNames = new ArrayList<>();
|
||||
try (ResultSet rs = metadata.getIndexInfo(id.catalog(), id.schema(), id.table(), true, true)) {
|
||||
String firstIndexName = null;
|
||||
while (rs.next()) {
|
||||
@ -1335,10 +1336,20 @@ public List<String> readTableUniqueIndices(DatabaseMetaData metadata, TableId id
|
||||
if (firstIndexName == null) {
|
||||
firstIndexName = indexName;
|
||||
}
|
||||
if (!isTableUniqueIndexIncluded(indexName, columnName)) {
|
||||
if (indexName != null) {
|
||||
boolean indexIncluded = isTableUniqueIndexIncluded(indexName, columnName);
|
||||
if (!indexIncluded && !excludedIndexNames.contains(indexName)) {
|
||||
excludedIndexNames.add(indexName);
|
||||
}
|
||||
if (excludedIndexNames.contains(indexName)) {
|
||||
// index has been excluded, skip further processing
|
||||
if (!uniqueIndexColumnNames.isEmpty()) {
|
||||
uniqueIndexColumnNames.clear();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// Only first unique index is taken into consideration
|
||||
}
|
||||
// Only first non-excluded unique index is taken into consideration
|
||||
if (indexName != null && !indexName.equals(firstIndexName)) {
|
||||
return uniqueIndexColumnNames;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user