DBZ-5441 Gracefully skip Oracle non-relational tables

This commit is contained in:
Chris Cranford 2022-07-28 23:22:53 -04:00 committed by Chris Cranford
parent d39e98fc9f
commit 41a9324044
6 changed files with 137 additions and 16 deletions

View File

@ -262,9 +262,20 @@ public Scn getCurrentScn() throws SQLException {
* @param tableId table identifier, should never be {@code null} * @param tableId table identifier, should never be {@code null}
* @return generated DDL * @return generated DDL
* @throws SQLException if an exception occurred obtaining the DDL metadata * @throws SQLException if an exception occurred obtaining the DDL metadata
* @throws NonRelationalTableException the table is not a relational table
*/ */
public String getTableMetadataDdl(TableId tableId) throws SQLException { public String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException {
try { try {
// This table contains all available objects that are considered relational & object based.
// By querying for TABLE_TYPE is null, we are explicitly confirming what if an entry exists
// that the table is in-fact a relational table and if the result set is empty, the object
// is another type, likely an object-based table, in which case we cannot generate DDL.
final String tableType = "SELECT COUNT(1) FROM ALL_ALL_TABLES WHERE OWNER='" + tableId.schema()
+ "' AND TABLE_NAME='" + tableId.table() + "' AND TABLE_TYPE IS NULL";
if (queryAndMap(tableType, rs -> rs.next() ? rs.getInt(1) : 0) == 0) {
throw new NonRelationalTableException("Table " + tableId + " is not a relational table");
}
// The storage and segment attributes aren't necessary // The storage and segment attributes aren't necessary
executeWithoutCommitting("begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;"); executeWithoutCommitting("begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;");
executeWithoutCommitting("begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;"); executeWithoutCommitting("begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;");
@ -468,4 +479,13 @@ protected Map<TableId, List<Column>> getColumnsDetails(String databaseCatalog,
} }
return super.getColumnsDetails(databaseCatalog, schemaNamePattern, tableName, tableFilter, columnFilter, metadata, viewIds); return super.getColumnsDetails(databaseCatalog, schemaNamePattern, tableName, tableFilter, columnFilter, metadata, viewIds);
} }
/**
* An exception that indicates the operation failed because the table is not a relational table.
*/
public static class NonRelationalTableException extends DebeziumException {
public NonRelationalTableException(String message) {
super(message);
}
}
} }

View File

@ -44,7 +44,7 @@ public class CreateTableParserListener extends BaseParserListener {
@Override @Override
public void enterCreate_table(PlSqlParser.Create_tableContext ctx) { public void enterCreate_table(PlSqlParser.Create_tableContext ctx) {
if (ctx.relational_table() == null) { if (ctx.relational_table() == null) {
throw new IllegalArgumentException("Only relational tables are supported"); throw new ParsingException(null, "Only relational tables are supported");
} }
TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name())); TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name()));
if (parser.getTableFilter().isIncluded(tableId)) { if (parser.getTableFilter().isIncluded(tableId)) {

View File

@ -23,6 +23,7 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnection.NonRelationalTableException;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OracleOffsetContext;
@ -908,6 +909,17 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta
OracleOffsetContext offsetContext, OracleOffsetContext offsetContext,
EventDispatcher<OraclePartition, TableId> dispatcher) EventDispatcher<OraclePartition, TableId> dispatcher)
throws SQLException, InterruptedException { throws SQLException, InterruptedException {
final String tableDdl;
try {
tableDdl = getTableMetadataDdl(tableId);
}
catch (NonRelationalTableException e) {
LOGGER.warn("Table {} is not a relational table and will be skipped.", tableId);
metrics.incrementWarningCount();
return null;
}
LOGGER.info("Table '{}' is new and will now be captured.", tableId); LOGGER.info("Table '{}' is new and will now be captured.", tableId);
offsetContext.event(tableId, Instant.now()); offsetContext.event(tableId, Instant.now());
dispatcher.dispatchSchemaChangeEvent(partition, dispatcher.dispatchSchemaChangeEvent(partition,
@ -918,7 +930,7 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta
tableId, tableId,
tableId.catalog(), tableId.catalog(),
tableId.schema(), tableId.schema(),
getTableMetadataDdl(tableId), tableDdl,
getSchema(), getSchema(),
Instant.now(), Instant.now(),
metrics, metrics,
@ -933,13 +945,14 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta
* @param tableId the table identifier, must not be {@code null} * @param tableId the table identifier, must not be {@code null}
* @return the table's create DDL statement, never {@code null} * @return the table's create DDL statement, never {@code null}
* @throws SQLException if an exception occurred obtaining the DDL statement * @throws SQLException if an exception occurred obtaining the DDL statement
* @throws NonRelationalTableException if the table is not a relational table
*/ */
private String getTableMetadataDdl(TableId tableId) throws SQLException { private String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException {
counters.tableMetadataCount++; counters.tableMetadataCount++;
LOGGER.info("Getting database metadata for table '{}'", tableId); LOGGER.info("Getting database metadata for table '{}'", tableId);
// A separate connection must be used for this out-of-bands query while processing LogMiner results. // A separate connection must be used for this out-of-bands query while processing LogMiner results.
// This should have negligible overhead since this use case should happen rarely. // This should have negligible overhead since this use case should happen rarely.
try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader())) { try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader(), false)) {
connection.setAutoCommit(false); connection.setAutoCommit(false);
final String pdbName = getConfig().getPdbName(); final String pdbName = getConfig().getPdbName();
if (pdbName != null) { if (pdbName != null) {

View File

@ -16,6 +16,7 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnection.NonRelationalTableException;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OracleOffsetContext;
@ -30,6 +31,7 @@
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.Strings;
import oracle.streams.ChunkColumnValue; import oracle.streams.ChunkColumnValue;
import oracle.streams.DDLLCR; import oracle.streams.DDLLCR;
@ -165,7 +167,18 @@ private void dispatchDataChangeEvent(RowLCR lcr, Map<String, Object> chunkValues
LOGGER.trace("Table {} is new but excluded, schema change skipped.", tableId); LOGGER.trace("Table {} is new but excluded, schema change skipped.", tableId);
return; return;
} }
LOGGER.info("Table {} is new and will be captured.", tableId);
final String tableDdl;
try {
tableDdl = getTableMetadataDdl(tableId);
}
catch (NonRelationalTableException e) {
LOGGER.warn("Table {} is not a relational table and will be skipped.", tableId);
streamingMetrics.incrementWarningCount();
return;
}
LOGGER.info("Table {} will be captured.", tableId);
dispatcher.dispatchSchemaChangeEvent( dispatcher.dispatchSchemaChangeEvent(
partition, partition,
tableId, tableId,
@ -176,13 +189,16 @@ private void dispatchDataChangeEvent(RowLCR lcr, Map<String, Object> chunkValues
tableId, tableId,
tableId.catalog(), tableId.catalog(),
tableId.schema(), tableId.schema(),
getTableMetadataDdl(tableId), tableDdl,
schema, schema,
Instant.now(), Instant.now(),
streamingMetrics, streamingMetrics,
null)); null));
table = schema.tableFor(tableId); table = schema.tableFor(tableId);
if (table == null) {
return;
}
} }
// Xstream does not provide any before state for LOB columns and so this map will be // Xstream does not provide any before state for LOB columns and so this map will be
@ -282,12 +298,13 @@ private TableId getTableId(LCR lcr) {
} }
} }
private String getTableMetadataDdl(TableId tableId) { private String getTableMetadataDdl(TableId tableId) throws NonRelationalTableException {
LOGGER.info("Getting database metadata for table '{}'", tableId);
final String pdbName = connectorConfig.getPdbName(); final String pdbName = connectorConfig.getPdbName();
// A separate connection must be used for this out-of-bands query while processing the Xstream callback. // A separate connection must be used for this out-of-bands query while processing the Xstream callback.
// This should have negligible overhead as this should happen rarely. // This should have negligible overhead as this should happen rarely.
try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader())) { try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader(), false)) {
if (pdbName != null) { if (!Strings.isNullOrBlank(pdbName)) {
connection.setSessionToPdb(pdbName); connection.setSessionToPdb(pdbName);
} }
connection.setAutoCommit(false); connection.setAutoCommit(false);

View File

@ -62,6 +62,7 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningStrategy; import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningStrategy;
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode; import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.connector.oracle.OracleConnectorConfig.TransactionSnapshotBoundaryMode; import io.debezium.connector.oracle.OracleConnectorConfig.TransactionSnapshotBoundaryMode;
@ -86,6 +87,7 @@
import io.debezium.heartbeat.Heartbeat; import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor; import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.history.MemoryDatabaseHistory; import io.debezium.relational.history.MemoryDatabaseHistory;
import io.debezium.storage.file.history.FileDatabaseHistory; import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing; import io.debezium.util.Testing;
@ -453,7 +455,7 @@ private void assertTxBatch(Configuration config, int expectedRecordCount, int of
records = consumeRecordsByTopic(expectedRecordCount); records = consumeRecordsByTopic(expectedRecordCount);
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER"); testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
assertThat(testTableRecords).hasSize(expectedRecordCount); assertThat(testTableRecords).hasSize(expectedRecordCount);
final String adapter = config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER); final ConnectorAdapter adapter = TestHelper.getAdapter(config);
for (int i = 0; i < expectedRecordCount; i++) { for (int i = 0; i < expectedRecordCount; i++) {
SourceRecord record3 = testTableRecords.get(i); SourceRecord record3 = testTableRecords.get(i);
@ -464,7 +466,7 @@ private void assertTxBatch(Configuration config, int expectedRecordCount, int of
assertThat(record3.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse(); assertThat(record3.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse();
assertThat(record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse(); assertThat(record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
if (!"LogMiner".equalsIgnoreCase(adapter)) { if (ConnectorAdapter.LOG_MINER != adapter) {
assertThat(record3.sourceOffset().containsKey(SourceInfo.LCR_POSITION_KEY)).isTrue(); assertThat(record3.sourceOffset().containsKey(SourceInfo.LCR_POSITION_KEY)).isTrue();
assertThat(record3.sourceOffset().containsKey(SourceInfo.SCN_KEY)).isFalse(); assertThat(record3.sourceOffset().containsKey(SourceInfo.SCN_KEY)).isFalse();
} }
@ -472,7 +474,7 @@ private void assertTxBatch(Configuration config, int expectedRecordCount, int of
source = (Struct) ((Struct) record3.value()).get("source"); source = (Struct) ((Struct) record3.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("false"); assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("false");
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull(); assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
if (!"LogMiner".equalsIgnoreCase(adapter)) { if (ConnectorAdapter.LOG_MINER != adapter) {
assertThat(source.get(SourceInfo.LCR_POSITION_KEY)).isNotNull(); assertThat(source.get(SourceInfo.LCR_POSITION_KEY)).isNotNull();
} }
@ -4261,6 +4263,63 @@ private void testTableWithForwardSlashes(String tableName, String topicTableName
} }
} }
@Test
@FixFor("DBZ-5441")
public void shouldGracefullySkipObjectBasedTables() throws Exception {
TestHelper.dropTable(connection, "dbz5441");
try {
// This grant isn't given by default, but it is needed to create types in this test case.
TestHelper.grantRole("CREATE ANY TYPE");
// Sets up all the log interceptors needed
final LogInterceptor logInterceptor = new LogInterceptor(RelationalSnapshotChangeEventSource.class);
// Setup object type and object table
connection.execute("CREATE TYPE DEBEZIUM.DBZ5441_TYPE AS OBJECT (id number, lvl number)");
connection.execute("CREATE TABLE DEBEZIUM.DBZ5441 of DEBEZIUM.DBZ5441_TYPE (primary key(id))");
TestHelper.streamTable(connection, "DBZ5441");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5441")
.build();
final LogInterceptor streamInterceptor;
switch (TestHelper.getAdapter(config)) {
case XSTREAM:
streamInterceptor = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler");
break;
default:
streamInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
}
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
assertNoRecordsToConsume();
// Simply indicates we did not find the table to lock and capture
// In other words, the snapshot performs no action on this table
assertThat(logInterceptor.containsMessage("Locking captured tables []")).isTrue();
connection.execute("INSERT INTO DEBEZIUM.DBZ5441 (id,lvl) values (1,1)");
Awaitility.await()
.atMost(120, TimeUnit.SECONDS)
.until(() -> streamInterceptor.containsMessage("is not a relational table and will be skipped"));
assertNoRecordsToConsume();
stopConnector();
}
finally {
// Drop table and type
TestHelper.dropTable(connection, "dbz5441");
connection.execute("DROP TYPE DEBEZIUM.DBZ5441_TYPE");
// Revoke special role granted for this test case
TestHelper.revokeRole("CREATE ANY TYPE");
}
}
private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException { private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
try (OracleConnection admin = TestHelper.adminConnection(true)) { try (OracleConnection admin = TestHelper.adminConnection(true)) {
final Scn scn = admin.getCurrentScn(); final Scn scn = admin.getCurrentScn();

View File

@ -21,6 +21,7 @@
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType; import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider; import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
@ -130,7 +131,7 @@ public static Configuration.Builder defaultConfig() {
jdbcConfiguration.forEach( jdbcConfiguration.forEach(
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)); (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
if (adapter().equals(OracleConnectorConfig.ConnectorAdapter.XSTREAM)) { if (adapter().equals(ConnectorAdapter.XSTREAM)) {
builder.withDefault(OracleConnectorConfig.XSTREAM_SERVER_NAME, "dbzxout"); builder.withDefault(OracleConnectorConfig.XSTREAM_SERVER_NAME, "dbzxout");
} }
else { else {
@ -500,9 +501,9 @@ public static int defaultMessageConsumerPollTimeout() {
return 120; return 120;
} }
public static OracleConnectorConfig.ConnectorAdapter adapter() { public static ConnectorAdapter adapter() {
final String s = System.getProperty(OracleConnectorConfig.CONNECTOR_ADAPTER.name()); final String s = System.getProperty(OracleConnectorConfig.CONNECTOR_ADAPTER.name());
return (s == null || s.length() == 0) ? OracleConnectorConfig.ConnectorAdapter.LOG_MINER : OracleConnectorConfig.ConnectorAdapter.parse(s); return (s == null || s.length() == 0) ? ConnectorAdapter.LOG_MINER : ConnectorAdapter.parse(s);
} }
/** /**
@ -610,4 +611,15 @@ public static boolean isUsingPdb() {
// if the property is not specified, we default to using PDB mode. // if the property is not specified, we default to using PDB mode.
return Strings.isNullOrEmpty(properties.get(PDB_NAME)); return Strings.isNullOrEmpty(properties.get(PDB_NAME));
} }
/**
* Returns the connector adapter from the provided configuration.
*
* @param config the connector configuration, must not be {@code null}
* @return the connector adapter being used.
*/
public static ConnectorAdapter getAdapter(Configuration config) {
return ConnectorAdapter.parse(config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER));
}
} }