From e85b6e1780df5cd86767bf846ac669a7b1b7fba8 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Mon, 27 Sep 2021 09:12:15 -0400 Subject: [PATCH] DBZ-3712 Support start/change archive.log.only.mode gracefully --- .../connector/oracle/OracleConnection.java | 36 +++++ .../LogMinerStreamingChangeEventSource.java | 65 ++++++++- .../connector/oracle/OracleConnectorIT.java | 128 ++++++++++++++++++ .../connector/oracle/util/TestHelper.java | 15 ++ 4 files changed, 243 insertions(+), 1 deletion(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index 6bd271ad3..124f38e7d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -38,6 +38,7 @@ import io.debezium.relational.Tables; import io.debezium.relational.Tables.ColumnNameFilter; import io.debezium.relational.Tables.TableFilter; +import io.debezium.util.Strings; import oracle.jdbc.OracleTypes; @@ -344,6 +345,41 @@ public Scn getCurrentScn() throws SQLException { }); } + /** + * Get the maximum system change number in the archive logs. + * + * @param archiveLogDestinationName the archive log destination name to be queried, can be {@code null}. + * @return the maximum system change number in the archive logs + * @throws SQLException if a database exception occurred + * @throws DebeziumException if the maximum archive log system change number could not be found + */ + public Scn getMaxArchiveLogScn(String archiveLogDestinationName) throws SQLException { + String query = "SELECT MAX(NEXT_CHANGE#) FROM V$ARCHIVED_LOG " + + "WHERE NAME IS NOT NULL " + + "AND ARCHIVED = 'YES' " + + "AND STATUS = 'A' " + + "AND DEST_ID IN (" + + "SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS " + + "WHERE STATUS = 'VALID' " + + "AND TYPE = 'LOCAL' "; + + if (Strings.isNullOrEmpty(archiveLogDestinationName)) { + query += "AND ROWNUM = 1"; + } + else { + query += "AND UPPER(DEST_NAME) = '" + archiveLogDestinationName + "'"; + } + + query += ")"; + + return queryAndMap(query, (rs) -> { + if (rs.next()) { + return Scn.valueOf(rs.getString(1)).subtract(Scn.valueOf(1)); + } + throw new DebeziumException("Could not obtain maximum archive log scn."); + }); + } + /** * Generate a given table's DDL metadata. * diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index a8336f914..122d86d14 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -114,6 +114,11 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema); try (LogMinerEventProcessor processor = createProcessor(context, partition, offsetContext)) { + + if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) { + return; + } + currentRedoLogSequences = getCurrentRedoLogSequences(); initializeRedoLogsForMining(jdbcConnection, false, startScn); @@ -121,8 +126,21 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, // Calculate time difference before each mining session to detect time zone offset changes (e.g. DST) on database server streamingMetrics.calculateTimeDifference(getDatabaseSystemTime(jdbcConnection)); + if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) { + break; + } + Instant start = Instant.now(); endScn = calculateEndScn(jdbcConnection, startScn, endScn); + + // This is a small window where when archive log only mode has completely caught up to the last + // record in the archive logs that both the start and end values are identical. In this use + // case we want to pause and restart the loop waiting for a new archive log before proceeding. + if (archiveLogOnlyMode && startScn.equals(endScn)) { + pauseBetweenMiningSessions(); + continue; + } + flushStrategy.flush(jdbcConnection.getCurrentScn()); if (hasLogSwitchOccurred()) { @@ -427,7 +445,9 @@ public void endMiningSession(OracleConnection connection, OracleOffsetContext of * @throws SQLException if the current max system change number cannot be obtained from the database */ private Scn calculateEndScn(OracleConnection connection, Scn startScn, Scn prevEndScn) throws SQLException { - Scn currentScn = connection.getCurrentScn(); + Scn currentScn = archiveLogOnlyMode + ? connection.getMaxArchiveLogScn(archiveDestinationName) + : connection.getCurrentScn(); streamingMetrics.setCurrentScn(currentScn); // Add the current batch size to the starting system change number @@ -575,6 +595,49 @@ private LogWriterFlushStrategy resolveFlushStrategy() { return new CommitLogWriterFlushStrategy(jdbcConnection); } + /** + * Waits for the starting system change number to exist in the archive logs before returning. + * + * @param context the change event source context + * @param startScn the starting system change number + * @return true if the code should continue, false if the code should end. + * @throws SQLException if a database exception occurred + * @throws InterruptedException if the pause between checks is interrupted + */ + private boolean waitForStartScnInArchiveLogs(ChangeEventSourceContext context, Scn startScn) throws SQLException, InterruptedException { + boolean showStartScnNotInArchiveLogs = true; + while (context.isRunning() && !isStartScnInArchiveLogs(startScn)) { + if (showStartScnNotInArchiveLogs) { + LOGGER.warn("Starting SCN {} is not yet in archive logs, waiting for archive log switch.", startScn); + showStartScnNotInArchiveLogs = false; + pauseBetweenMiningSessions(); + continue; + } + } + + if (!context.isRunning()) { + return false; + } + + if (!showStartScnNotInArchiveLogs) { + LOGGER.info("Starting SCN {} is now available in archive logs, log mining unpaused.", startScn); + } + return true; + } + + /** + * Returns whether the starting system change number is in the archive logs. + * + * @param startScn the starting system change number + * @return true if the starting system change number is in the archive logs; false otherwise. + * @throws SQLException if a database exception occurred + */ + private boolean isStartScnInArchiveLogs(Scn startScn) throws SQLException { + List logs = LogMinerHelper.getLogFilesForOffsetScn(jdbcConnection, startScn, archiveLogRetention, archiveLogOnlyMode, archiveDestinationName); + return logs.stream() + .anyMatch(l -> l.getFirstScn().compareTo(startScn) <= 0 && l.getNextScn().compareTo(startScn) > 0 && l.getType().equals(LogFile.Type.ARCHIVE)); + } + @Override public void commitOffset(Map offset) { // nothing to do diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index c1a32eed5..ac4475968 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -15,10 +15,12 @@ import static org.fest.assertions.MapAssert.entry; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.SQLException; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -33,6 +35,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; import org.awaitility.Durations; +import org.awaitility.core.ConditionTimeoutException; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -2116,6 +2119,131 @@ public void shouldIgnoreAllTablesInExcludedSchemas() throws Exception { } } + @Test + @FixFor("DBZ-3712") + public void shouldStartWithArchiveLogOnlyModeAndStreamWhenRecordsBecomeAvailable() throws Exception { + TestHelper.dropTable(connection, "dbz3712"); + try { + connection.execute("CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))"); + TestHelper.streamTable(connection, "dbz3712"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true) + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // At this point the connector is new and should not emit any records as the SCN offset + // obtained from the snapshot is in the redo logs. + waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS); + assertNoRecordsToConsume(); + + // We will insert a new record but this record won't be emitted right away and will + // require that a log switch happen so that it can be emitted. + connection.execute("INSERT INTO dbz3712 (id,data) values (1, 'Test')"); + waitForLogSwitchOrForceOneAfterTimeout(); + + // We should now be able to consume a record + SourceRecords records = consumeRecordsByTopic(1); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1); + } + finally { + TestHelper.dropTable(connection, "dbz3712"); + } + } + + @Test + @FixFor("DBZ-3712") + public void shouldPermitChangingToArchiveLogOnlyModeOnExistingConnector() throws Exception { + TestHelper.dropTable(connection, "dbz3712"); + try { + connection.execute("CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))"); + TestHelper.streamTable(connection, "dbz3712"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // The connector was started with archive.log.only.mode disabled so this record should + // be emitted immediately once its written to the redo logs. + connection.execute("INSERT INTO dbz3712 (id,data) values (1, 'Test1')"); + + // We should now be able to consume a record + SourceRecords records = consumeRecordsByTopic(1); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1); + + // Restart connector using the same offsets but with archive log only mode + stopConnector(); + + config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true) + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // At this point the connector was restarted with archive log only mode. The SCN offset + // was previously in the redo logs and may likely not be in the archive logs on start so + // we'll give the connector a moment and verify it has no records to consume. + waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS); + assertNoRecordsToConsume(); + + // Insert a new record + // This should not be picked up until after a log switch + connection.execute("INSERT INTO dbz3712 (id,data) values (2, 'Test2')"); + waitForLogSwitchOrForceOneAfterTimeout(); + + // We should now be able to consume a record + records = consumeRecordsByTopic(1); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1); + + // Insert a new record + // This should not be picked up until after a log switch + connection.execute("INSERT INTO dbz3712 (id,data) values (3, 'Test2')"); + waitForLogSwitchOrForceOneAfterTimeout(); + + // We should now be able to consume a record + records = consumeRecordsByTopic(1); + assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1); + } + finally { + TestHelper.dropTable(connection, "dbz3712"); + } + } + + private void waitForLogSwitchOrForceOneAfterTimeout() throws SQLException { + List sequences = TestHelper.getCurrentRedoLogSequences(); + try { + Awaitility.await() + .pollInterval(Duration.of(5, ChronoUnit.SECONDS)) + .atMost(Duration.of(20, ChronoUnit.SECONDS)) + .until(() -> { + if (TestHelper.getCurrentRedoLogSequences().equals(sequences)) { + assertNoRecordsToConsume(); + return false; + } + // Oracle triggered its on log switch + return true; + }); + + // In this use case Oracle triggered its own log switch + // We don't need to trigger one on our own. + } + catch (ConditionTimeoutException e) { + // expected if Oracle doesn't trigger its own log switch + TestHelper.forceLogfileSwitch(); + } + } + private String generateAlphaNumericStringColumn(int size) { final String alphaNumericString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz"; final StringBuilder sb = new StringBuilder(size); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index 146766860..3d6af27d3 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -5,8 +5,11 @@ */ package io.debezium.connector.oracle.util; +import java.math.BigInteger; import java.nio.file.Path; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import io.debezium.config.Configuration; @@ -415,4 +418,16 @@ public static void dropAllTables() { throw new RuntimeException("Failed to clean database"); } } + + public static List getCurrentRedoLogSequences() throws SQLException { + try (OracleConnection connection = adminConnection()) { + return connection.queryAndMap("SELECT SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT'", rs -> { + List sequences = new ArrayList<>(); + while (rs.next()) { + sequences.add(new BigInteger(rs.getString(1))); + } + return sequences; + }); + } + } }