From e1579f8738214c10a90507f7223cb87ed04965aa Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 11 May 2022 13:23:33 -0400 Subject: [PATCH] DBZ-4782 Flush SCN values upon schema changes --- .../AbstractLogMinerEventProcessor.java | 28 +++ .../oracle/OracleSchemaMigrationIT.java | 189 ++++++++++++++++++ 2 files changed, 217 insertions(+) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index dd4bb5d5f..06b803287 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -557,10 +557,38 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept return; } + final Scn commitScn = offsetContext.getCommitScn(); + if (commitScn != null && commitScn.compareTo(row.getScn()) >= 0) { + LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN)", row.getRedoSql(), row.getScn(), commitScn); + return; + } + LOGGER.trace("DDL: '{}' {}", row.getRedoSql(), row); if (row.getTableName() != null) { counters.ddlCount++; final TableId tableId = row.getTableId(); + + final int activeTransactions = getTransactionCache().size(); + if (activeTransactions <= 1) { + boolean advanceLowerScnBoundary = true; + if (activeTransactions == 1) { + final String transactionId = getTransactionCache().keySet().iterator().next(); + if (!transactionId.equals(row.getTransactionId())) { + // The row's transaction is not the current only active transaction. + // We should not advance the SCN boundaries. + advanceLowerScnBoundary = false; + } + } + if (advanceLowerScnBoundary) { + LOGGER.debug("Schema change advanced offset SCN to {}", row.getScn()); + offsetContext.setScn(row.getScn()); + } + } + + // Should always advance the commit SCN point with schema changes + LOGGER.debug("Schema change advanced offset commit SCN to {}", row.getScn()); + offsetContext.setCommitScn(row.getScn()); + dispatcher.dispatchSchemaChangeEvent(partition, tableId, new OracleSchemaChangeEventEmitter( diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java index 02cc81748..791d64162 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleSchemaMigrationIT.java @@ -1197,6 +1197,195 @@ public void shouldParseSchemaChangeOnTableWithRawDataType() throws Exception { } } + @Test + @FixFor("DBZ-4782") + public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestart() throws Exception { + TestHelper.dropTable(connection, "dbz4782"); + try { + connection.execute("CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))"); + TestHelper.streamTable(connection, "dbz4782"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782") + .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("ALTER TABLE dbz4782 add data2 varchar2(50)"); + + // CREATE, ALTER + SourceRecords sourceRecords = consumeRecordsByTopic(2); + List records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME); + assertThat(records).hasSize(2); + + assertSnapshotSchemaChange(records.get(0)); + List tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges"); + assertThat(tableChanges).hasSize(1); + assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782"); + + assertStreamingSchemaChange(records.get(1)); + tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges"); + assertThat(tableChanges).hasSize(1); + assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782"); + + // Stop the connector + stopConnector(); + + // Restart connector and verify that we do not re-emit the ALTER table + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // Wait for 20 seconds and assert that there are no available records + waitForAvailableRecords(20, TimeUnit.SECONDS); + assertNoRecordsToConsume(); + } + finally { + TestHelper.dropTable(connection, "dbz4782"); + } + } + + @Test + @FixFor("DBZ-4782") + public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestartWithFollowupDml() throws Exception { + TestHelper.dropTable(connection, "dbz4782"); + try { + createTable("dbz4782", "CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782") + .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("ALTER TABLE dbz4782 add data2 varchar2(50)"); + + // CREATE, ALTER + SourceRecords sourceRecords = consumeRecordsByTopic(2); + List records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME); + assertThat(records).hasSize(2); + + assertSnapshotSchemaChange(records.get(0)); + List tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges"); + assertThat(tableChanges).hasSize(1); + assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782"); + + assertStreamingSchemaChange(records.get(1)); + tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges"); + assertThat(tableChanges).hasSize(1); + assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782"); + + // Stop the connector + stopConnector(); + + // Restart connector and verify that we do not re-emit the ALTER table + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO dbz4782 values (1, 'data1', 'data2')"); + + sourceRecords = consumeRecordsByTopic(1); + records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782")); + assertThat(records).hasSize(1); + VerifyRecord.isValidInsert(records.get(0), "ID", 1); + + // There should be no other records to consume + assertNoRecordsToConsume(); + } + finally { + TestHelper.dropTable(connection, "dbz4782"); + } + } + + @Test + @FixFor("DBZ-4782") + public void shouldNotResendSchemaChangeWithInprogressTransactionOnSecondTable() throws Exception { + TestHelper.dropTable(connection, "dbz4782a"); + TestHelper.dropTable(connection, "dbz4782b"); + try { + createTable("dbz4782a", "CREATE TABLE dbz4782a (id numeric(9,0) primary key, data varchar2(50))"); + createTable("dbz4782b", "CREATE TABLE dbz4782b (id numeric(9,0) primary key, data varchar2(50))"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782[A|B]") + .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // Start in-progress transaction for dbz4728b + try (OracleConnection connection2 = TestHelper.testConnection()) { + + // Perform in-progress operation on second connection & alter the other table in primary connection + connection2.executeWithoutCommitting("INSERT INTO dbz4782b values (2, 'connection2')"); + connection.execute("ALTER TABLE dbz4782a add data2 varchar2(50)"); + + // CREATEx2, ALTER (INSERT isn't here yet, its in progress) + SourceRecords sourceRecords = consumeRecordsByTopic(3); + List records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME); + assertThat(records).hasSize(3); + + assertSnapshotSchemaChange(records.get(0)); + List tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges"); + assertThat(tableChanges).hasSize(1); + assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782A"); + + assertSnapshotSchemaChange(records.get(1)); + tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges"); + assertThat(tableChanges).hasSize(1); + assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782B"); + + assertStreamingSchemaChange(records.get(2)); + tableChanges = ((Struct) records.get(2).value()).getArray("tableChanges"); + assertThat(tableChanges).hasSize(1); + assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782A"); + + // Stop the connector + stopConnector(); + + // Now commit the in-progress transaction while connector is down + connection2.commit(); + + // Restart the connector and verify we don't re-emit the ALTER table; however that we do + // capture the in-progress transaction correctly when it is committed. + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + connection.execute("INSERT INTO dbz4782a values (1, 'data1', 'data2')"); + + sourceRecords = consumeRecordsByTopic(2); + sourceRecords.allRecordsInOrder().forEach(System.out::println); + records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782A")); + assertThat(records).hasSize(1); + VerifyRecord.isValidInsert(records.get(0), "ID", 1); + + records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782B")); + assertThat(records).hasSize(1); + VerifyRecord.isValidInsert(records.get(0), "ID", 2); + } + + // There should be no other records to consume + assertNoRecordsToConsume(); + } + finally { + TestHelper.dropTable(connection, "dbz4782a"); + TestHelper.dropTable(connection, "dbz4782b"); + } + } + private static String getTableIdString(String schemaName, String tableName) { return new TableId(TestHelper.getDatabaseName(), schemaName, tableName).toDoubleQuotedString(); }