diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index abd898b51..5235fe2c4 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -262,9 +262,20 @@ public Scn getCurrentScn() throws SQLException { * @param tableId table identifier, should never be {@code null} * @return generated DDL * @throws SQLException if an exception occurred obtaining the DDL metadata + * @throws NonRelationalTableException the table is not a relational table */ - public String getTableMetadataDdl(TableId tableId) throws SQLException { + public String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException { try { + // This table contains all available objects that are considered relational & object based. + // By querying for TABLE_TYPE is null, we are explicitly confirming what if an entry exists + // that the table is in-fact a relational table and if the result set is empty, the object + // is another type, likely an object-based table, in which case we cannot generate DDL. + final String tableType = "SELECT COUNT(1) FROM ALL_ALL_TABLES WHERE OWNER='" + tableId.schema() + + "' AND TABLE_NAME='" + tableId.table() + "' AND TABLE_TYPE IS NULL"; + if (queryAndMap(tableType, rs -> rs.next() ? rs.getInt(1) : 0) == 0) { + throw new NonRelationalTableException("Table " + tableId + " is not a relational table"); + } + // The storage and segment attributes aren't necessary executeWithoutCommitting("begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;"); executeWithoutCommitting("begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;"); @@ -468,4 +479,13 @@ protected Map> getColumnsDetails(String databaseCatalog, } return super.getColumnsDetails(databaseCatalog, schemaNamePattern, tableName, tableFilter, columnFilter, metadata, viewIds); } + + /** + * An exception that indicates the operation failed because the table is not a relational table. + */ + public static class NonRelationalTableException extends DebeziumException { + public NonRelationalTableException(String message) { + super(message); + } + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java index 618cd97e2..14c322872 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java @@ -44,7 +44,7 @@ public class CreateTableParserListener extends BaseParserListener { @Override public void enterCreate_table(PlSqlParser.Create_tableContext ctx) { if (ctx.relational_table() == null) { - throw new IllegalArgumentException("Only relational tables are supported"); + throw new ParsingException(null, "Only relational tables are supported"); } TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name())); if (parser.getTableFilter().isIncluded(tableId)) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 598e98b9a..87336cd33 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -23,6 +23,7 @@ import io.debezium.DebeziumException; import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnection.NonRelationalTableException; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; @@ -908,6 +909,17 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta OracleOffsetContext offsetContext, EventDispatcher dispatcher) throws SQLException, InterruptedException { + + final String tableDdl; + try { + tableDdl = getTableMetadataDdl(tableId); + } + catch (NonRelationalTableException e) { + LOGGER.warn("Table {} is not a relational table and will be skipped.", tableId); + metrics.incrementWarningCount(); + return null; + } + LOGGER.info("Table '{}' is new and will now be captured.", tableId); offsetContext.event(tableId, Instant.now()); dispatcher.dispatchSchemaChangeEvent(partition, @@ -918,7 +930,7 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta tableId, tableId.catalog(), tableId.schema(), - getTableMetadataDdl(tableId), + tableDdl, getSchema(), Instant.now(), metrics, @@ -933,13 +945,14 @@ private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId ta * @param tableId the table identifier, must not be {@code null} * @return the table's create DDL statement, never {@code null} * @throws SQLException if an exception occurred obtaining the DDL statement + * @throws NonRelationalTableException if the table is not a relational table */ - private String getTableMetadataDdl(TableId tableId) throws SQLException { + private String getTableMetadataDdl(TableId tableId) throws SQLException, NonRelationalTableException { counters.tableMetadataCount++; LOGGER.info("Getting database metadata for table '{}'", tableId); // A separate connection must be used for this out-of-bands query while processing LogMiner results. // This should have negligible overhead since this use case should happen rarely. - try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader())) { + try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader(), false)) { connection.setAutoCommit(false); final String pdbName = getConfig().getPdbName(); if (pdbName != null) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java index 5e6189996..7910f9f27 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrEventHandler.java @@ -16,6 +16,7 @@ import io.debezium.DebeziumException; import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnection.NonRelationalTableException; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; @@ -30,6 +31,7 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.util.Clock; +import io.debezium.util.Strings; import oracle.streams.ChunkColumnValue; import oracle.streams.DDLLCR; @@ -165,7 +167,18 @@ private void dispatchDataChangeEvent(RowLCR lcr, Map chunkValues LOGGER.trace("Table {} is new but excluded, schema change skipped.", tableId); return; } - LOGGER.info("Table {} is new and will be captured.", tableId); + + final String tableDdl; + try { + tableDdl = getTableMetadataDdl(tableId); + } + catch (NonRelationalTableException e) { + LOGGER.warn("Table {} is not a relational table and will be skipped.", tableId); + streamingMetrics.incrementWarningCount(); + return; + } + + LOGGER.info("Table {} will be captured.", tableId); dispatcher.dispatchSchemaChangeEvent( partition, tableId, @@ -176,13 +189,16 @@ private void dispatchDataChangeEvent(RowLCR lcr, Map chunkValues tableId, tableId.catalog(), tableId.schema(), - getTableMetadataDdl(tableId), + tableDdl, schema, Instant.now(), streamingMetrics, null)); table = schema.tableFor(tableId); + if (table == null) { + return; + } } // Xstream does not provide any before state for LOB columns and so this map will be @@ -282,12 +298,13 @@ private TableId getTableId(LCR lcr) { } } - private String getTableMetadataDdl(TableId tableId) { + private String getTableMetadataDdl(TableId tableId) throws NonRelationalTableException { + LOGGER.info("Getting database metadata for table '{}'", tableId); final String pdbName = connectorConfig.getPdbName(); // A separate connection must be used for this out-of-bands query while processing the Xstream callback. // This should have negligible overhead as this should happen rarely. - try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader())) { - if (pdbName != null) { + try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader(), false)) { + if (!Strings.isNullOrBlank(pdbName)) { connection.setSessionToPdb(pdbName); } connection.setAutoCommit(false); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 47bb8380b..00adfccae 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -62,6 +62,7 @@ import io.debezium.DebeziumException; import io.debezium.config.Configuration; +import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter; import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningStrategy; import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode; import io.debezium.connector.oracle.OracleConnectorConfig.TransactionSnapshotBoundaryMode; @@ -86,6 +87,7 @@ import io.debezium.heartbeat.Heartbeat; import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.RelationalSnapshotChangeEventSource; import io.debezium.relational.history.MemoryDatabaseHistory; import io.debezium.storage.file.history.FileDatabaseHistory; import io.debezium.util.Testing; @@ -453,7 +455,7 @@ private void assertTxBatch(Configuration config, int expectedRecordCount, int of records = consumeRecordsByTopic(expectedRecordCount); testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER"); assertThat(testTableRecords).hasSize(expectedRecordCount); - final String adapter = config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER); + final ConnectorAdapter adapter = TestHelper.getAdapter(config); for (int i = 0; i < expectedRecordCount; i++) { SourceRecord record3 = testTableRecords.get(i); @@ -464,7 +466,7 @@ private void assertTxBatch(Configuration config, int expectedRecordCount, int of assertThat(record3.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse(); assertThat(record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse(); - if (!"LogMiner".equalsIgnoreCase(adapter)) { + if (ConnectorAdapter.LOG_MINER != adapter) { assertThat(record3.sourceOffset().containsKey(SourceInfo.LCR_POSITION_KEY)).isTrue(); assertThat(record3.sourceOffset().containsKey(SourceInfo.SCN_KEY)).isFalse(); } @@ -472,7 +474,7 @@ private void assertTxBatch(Configuration config, int expectedRecordCount, int of source = (Struct) ((Struct) record3.value()).get("source"); assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo("false"); assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull(); - if (!"LogMiner".equalsIgnoreCase(adapter)) { + if (ConnectorAdapter.LOG_MINER != adapter) { assertThat(source.get(SourceInfo.LCR_POSITION_KEY)).isNotNull(); } @@ -4261,6 +4263,63 @@ private void testTableWithForwardSlashes(String tableName, String topicTableName } } + @Test + @FixFor("DBZ-5441") + public void shouldGracefullySkipObjectBasedTables() throws Exception { + TestHelper.dropTable(connection, "dbz5441"); + try { + // This grant isn't given by default, but it is needed to create types in this test case. + TestHelper.grantRole("CREATE ANY TYPE"); + + // Sets up all the log interceptors needed + final LogInterceptor logInterceptor = new LogInterceptor(RelationalSnapshotChangeEventSource.class); + + // Setup object type and object table + connection.execute("CREATE TYPE DEBEZIUM.DBZ5441_TYPE AS OBJECT (id number, lvl number)"); + connection.execute("CREATE TABLE DEBEZIUM.DBZ5441 of DEBEZIUM.DBZ5441_TYPE (primary key(id))"); + TestHelper.streamTable(connection, "DBZ5441"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5441") + .build(); + + final LogInterceptor streamInterceptor; + switch (TestHelper.getAdapter(config)) { + case XSTREAM: + streamInterceptor = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler"); + break; + default: + streamInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class); + } + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + assertNoRecordsToConsume(); + + // Simply indicates we did not find the table to lock and capture + // In other words, the snapshot performs no action on this table + assertThat(logInterceptor.containsMessage("Locking captured tables []")).isTrue(); + + connection.execute("INSERT INTO DEBEZIUM.DBZ5441 (id,lvl) values (1,1)"); + + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .until(() -> streamInterceptor.containsMessage("is not a relational table and will be skipped")); + + assertNoRecordsToConsume(); + stopConnector(); + } + finally { + // Drop table and type + TestHelper.dropTable(connection, "dbz5441"); + connection.execute("DROP TYPE DEBEZIUM.DBZ5441_TYPE"); + // Revoke special role granted for this test case + TestHelper.revokeRole("CREATE ANY TYPE"); + } + } + private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException { try (OracleConnection admin = TestHelper.adminConnection(true)) { final Scn scn = admin.getCurrentScn(); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index 17b21991e..d831291ba 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -21,6 +21,7 @@ import io.debezium.config.Field; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter; import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType; import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider; import io.debezium.jdbc.JdbcConfiguration; @@ -130,7 +131,7 @@ public static Configuration.Builder defaultConfig() { jdbcConfiguration.forEach( (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)); - if (adapter().equals(OracleConnectorConfig.ConnectorAdapter.XSTREAM)) { + if (adapter().equals(ConnectorAdapter.XSTREAM)) { builder.withDefault(OracleConnectorConfig.XSTREAM_SERVER_NAME, "dbzxout"); } else { @@ -500,9 +501,9 @@ public static int defaultMessageConsumerPollTimeout() { return 120; } - public static OracleConnectorConfig.ConnectorAdapter adapter() { + public static ConnectorAdapter adapter() { final String s = System.getProperty(OracleConnectorConfig.CONNECTOR_ADAPTER.name()); - return (s == null || s.length() == 0) ? OracleConnectorConfig.ConnectorAdapter.LOG_MINER : OracleConnectorConfig.ConnectorAdapter.parse(s); + return (s == null || s.length() == 0) ? ConnectorAdapter.LOG_MINER : ConnectorAdapter.parse(s); } /** @@ -610,4 +611,15 @@ public static boolean isUsingPdb() { // if the property is not specified, we default to using PDB mode. return Strings.isNullOrEmpty(properties.get(PDB_NAME)); } + + /** + * Returns the connector adapter from the provided configuration. + * + * @param config the connector configuration, must not be {@code null} + * @return the connector adapter being used. + */ + public static ConnectorAdapter getAdapter(Configuration config) { + return ConnectorAdapter.parse(config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER)); + } + }