DBZ-3036 Detect and skip Oracle IOT special tables

This commit is contained in:
Chris Cranford 2021-02-25 14:45:38 -05:00 committed by Gunnar Morling
parent ddb1498d1d
commit 4d1d9bafee
4 changed files with 111 additions and 36 deletions

View File

@ -36,7 +36,6 @@
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.util.Strings;
import oracle.jdbc.OracleTypes;
@ -177,44 +176,32 @@ public Set<TableId> readTableNames(String databaseCatalog, String schemaNamePatt
.collect(Collectors.toSet());
}
protected Set<TableId> getAllTableIds(String catalogName, String schemaNamePattern, boolean isView) throws SQLException {
String query;
boolean filterBySchema = !Strings.isNullOrEmpty(schemaNamePattern);
if (!isView) {
query = "select table_name, owner from all_tables where table_name NOT LIKE 'MDRT_%' AND table_name not LIKE 'MDXT_%' ";
if (filterBySchema) {
query += " and owner like ?";
}
}
else {
query = "select view_name, owner from all_views";
if (filterBySchema) {
query += " where owner like ?";
}
}
/**
* Retrieves all {@code TableId} in a given database catalog, filtering certain ids that
* should be omitted from the returned set such as special spatial tables and index-organized
* tables.
*
* @param catalogName the catalog/database name
* @return set of all table ids for existing table objects
* @throws SQLException if a database exception occurred
*/
protected Set<TableId> getAllTableIds(String catalogName) throws SQLException {
final String query = "select owner, table_name from all_tables " +
// filter special spatial tables
"where table_name NOT LIKE 'MDRT_%' " +
"and table_name NOT LIKE 'MDRS_%' " +
"and table_name NOT LIKE 'MDXT_%' " +
// filter index-organized-tables
"and (table_name NOT LIKE 'SYS_IOT_OVER_%' and IOT_NAME IS NULL) ";
Set<TableId> tableIds = new HashSet<>();
try (PreparedStatement statement = connection().prepareStatement(query)) {
if (filterBySchema) {
statement.setString(1, '%' + schemaNamePattern.toUpperCase() + '%');
query(query, (rs) -> {
while (rs.next()) {
tableIds.add(new TableId(catalogName, rs.getString(1), rs.getString(2)));
}
try (ResultSet result = statement.executeQuery()) {
while (result.next()) {
String tableName = result.getString(1);
final String schemaName = result.getString(2);
TableId tableId = new TableId(catalogName, schemaName, tableName);
tableIds.add(tableId);
}
}
}
finally {
LOGGER.trace("TableIds are: {}", tableIds);
}
});
return tableIds;
}

View File

@ -79,7 +79,7 @@ protected SnapshotContext prepare(ChangeEventSourceContext context) throws Excep
@Override
protected Set<TableId> getAllTableIds(RelationalSnapshotContext ctx) throws Exception {
return jdbcConnection.getAllTableIds(ctx.catalogName, null, false);
return jdbcConnection.getAllTableIds(ctx.catalogName);
// this very slow approach(commented out), it took 30 minutes on an instance with 600 tables
// return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[] {"TABLE"} );
}

View File

@ -8,6 +8,7 @@
import static io.debezium.connector.oracle.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.oracle.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.oracle.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
import static io.debezium.connector.oracle.util.TestHelper.defaultConfig;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
@ -42,6 +43,7 @@
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIs;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.Envelope.FieldName;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
@ -1486,6 +1488,64 @@ public void shouldResumeStreamingAtCorrectScnOffset() throws Exception {
}
}
@Test
@FixFor("DBZ-3036")
public void shouldHandleParentChildIndexOrganizedTables() throws Exception {
TestHelper.dropTable(connection, "test_iot");
try {
String ddl = "CREATE TABLE test_iot (" +
"id numeric(9,0), " +
"description varchar2(50) not null, " +
"primary key(id)) " +
"ORGANIZATION INDEX " +
"INCLUDING description " +
"OVERFLOW";
connection.execute(ddl);
TestHelper.streamTable(connection, "debezium.test_iot");
// Insert data for snapshot
connection.executeWithoutCommitting("INSERT INTO debezium.test_iot VALUES ('1', 'Hello World')");
connection.execute("COMMIT");
Configuration config = defaultConfig()
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "(.)*IOT(.)*")
.build();
start(OracleConnector.class, config);
assertNoRecordsToConsume();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
SourceRecord record = records.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
Struct after = (Struct) ((Struct) record.value()).get(FieldName.AFTER);
VerifyRecord.isValidRead(record, "ID", 1);
assertThat(after.get("DESCRIPTION")).isEqualTo("Hello World");
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert data for streaming
connection.executeWithoutCommitting("INSERT INTO debezium.test_iot VALUES ('2', 'Goodbye')");
connection.execute("COMMIT");
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
record = records.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
after = (Struct) ((Struct) record.value()).get(FieldName.AFTER);
VerifyRecord.isValidInsert(record, "ID", 2);
assertThat(after.get("DESCRIPTION")).isEqualTo("Goodbye");
}
finally {
TestHelper.dropTable(connection, "test_iot");
// This makes sure all index-organized tables are cleared after dropping parent table
TestHelper.purgeRecycleBin(connection);
}
}
private String generateAlphaNumericStringColumn(int size) {
final String alphaNumericString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
final StringBuilder sb = new StringBuilder(size);

View File

@ -251,6 +251,34 @@ public static void dropTable(OracleConnection connection, String table) {
}
}
/**
* Enables a given table to be streamed by Oracle.
*
* @param connection the oracle connection
* @param table the table name in {@code schema.table} format.
* @throws SQLException if an exception occurred
*/
public static void streamTable(OracleConnection connection, String table) throws SQLException {
connection.execute(String.format("GRANT SELECT ON %s TO %s", table, getConnectorUserName()));
connection.execute(String.format("ALTER TABLE %s ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS", table));
}
/**
* Clear the recycle bin, removing all objects from the bin and release all space associated
* with objects in the recycle bin. This also clears any system-generated objects that are
* associated with a table that may have been recently dropped, such as index-organized tables.
*
* @param connection the oracle connection
*/
public static void purgeRecycleBin(OracleConnection connection) {
try {
connection.execute("PURGE RECYCLEBIN");
}
catch (SQLException e) {
throw new RuntimeException("Failed to clear user recyclebin", e);
}
}
public static int defaultMessageConsumerPollTimeout() {
return 120;
}