DBZ-3712 Support start/change archive.log.only.mode gracefully

This commit is contained in:
Chris Cranford 2021-09-27 09:12:15 -04:00 committed by Gunnar Morling
parent faf2d241b0
commit e85b6e1780
4 changed files with 243 additions and 1 deletions

View File

@ -38,6 +38,7 @@
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.util.Strings;
import oracle.jdbc.OracleTypes;
@ -344,6 +345,41 @@ public Scn getCurrentScn() throws SQLException {
});
}
/**
* Get the maximum system change number in the archive logs.
*
* @param archiveLogDestinationName the archive log destination name to be queried, can be {@code null}.
* @return the maximum system change number in the archive logs
* @throws SQLException if a database exception occurred
* @throws DebeziumException if the maximum archive log system change number could not be found
*/
public Scn getMaxArchiveLogScn(String archiveLogDestinationName) throws SQLException {
String query = "SELECT MAX(NEXT_CHANGE#) FROM V$ARCHIVED_LOG " +
"WHERE NAME IS NOT NULL " +
"AND ARCHIVED = 'YES' " +
"AND STATUS = 'A' " +
"AND DEST_ID IN (" +
"SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS " +
"WHERE STATUS = 'VALID' " +
"AND TYPE = 'LOCAL' ";
if (Strings.isNullOrEmpty(archiveLogDestinationName)) {
query += "AND ROWNUM = 1";
}
else {
query += "AND UPPER(DEST_NAME) = '" + archiveLogDestinationName + "'";
}
query += ")";
return queryAndMap(query, (rs) -> {
if (rs.next()) {
return Scn.valueOf(rs.getString(1)).subtract(Scn.valueOf(1));
}
throw new DebeziumException("Could not obtain maximum archive log scn.");
});
}
/**
* Generate a given table's DDL metadata.
*

View File

@ -114,6 +114,11 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
try (LogMinerEventProcessor processor = createProcessor(context, partition, offsetContext)) {
if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
return;
}
currentRedoLogSequences = getCurrentRedoLogSequences();
initializeRedoLogsForMining(jdbcConnection, false, startScn);
@ -121,8 +126,21 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
// Calculate time difference before each mining session to detect time zone offset changes (e.g. DST) on database server
streamingMetrics.calculateTimeDifference(getDatabaseSystemTime(jdbcConnection));
if (archiveLogOnlyMode && !waitForStartScnInArchiveLogs(context, startScn)) {
break;
}
Instant start = Instant.now();
endScn = calculateEndScn(jdbcConnection, startScn, endScn);
// This is a small window where when archive log only mode has completely caught up to the last
// record in the archive logs that both the start and end values are identical. In this use
// case we want to pause and restart the loop waiting for a new archive log before proceeding.
if (archiveLogOnlyMode && startScn.equals(endScn)) {
pauseBetweenMiningSessions();
continue;
}
flushStrategy.flush(jdbcConnection.getCurrentScn());
if (hasLogSwitchOccurred()) {
@ -427,7 +445,9 @@ public void endMiningSession(OracleConnection connection, OracleOffsetContext of
* @throws SQLException if the current max system change number cannot be obtained from the database
*/
private Scn calculateEndScn(OracleConnection connection, Scn startScn, Scn prevEndScn) throws SQLException {
Scn currentScn = connection.getCurrentScn();
Scn currentScn = archiveLogOnlyMode
? connection.getMaxArchiveLogScn(archiveDestinationName)
: connection.getCurrentScn();
streamingMetrics.setCurrentScn(currentScn);
// Add the current batch size to the starting system change number
@ -575,6 +595,49 @@ private LogWriterFlushStrategy resolveFlushStrategy() {
return new CommitLogWriterFlushStrategy(jdbcConnection);
}
/**
* Waits for the starting system change number to exist in the archive logs before returning.
*
* @param context the change event source context
* @param startScn the starting system change number
* @return true if the code should continue, false if the code should end.
* @throws SQLException if a database exception occurred
* @throws InterruptedException if the pause between checks is interrupted
*/
private boolean waitForStartScnInArchiveLogs(ChangeEventSourceContext context, Scn startScn) throws SQLException, InterruptedException {
boolean showStartScnNotInArchiveLogs = true;
while (context.isRunning() && !isStartScnInArchiveLogs(startScn)) {
if (showStartScnNotInArchiveLogs) {
LOGGER.warn("Starting SCN {} is not yet in archive logs, waiting for archive log switch.", startScn);
showStartScnNotInArchiveLogs = false;
pauseBetweenMiningSessions();
continue;
}
}
if (!context.isRunning()) {
return false;
}
if (!showStartScnNotInArchiveLogs) {
LOGGER.info("Starting SCN {} is now available in archive logs, log mining unpaused.", startScn);
}
return true;
}
/**
* Returns whether the starting system change number is in the archive logs.
*
* @param startScn the starting system change number
* @return true if the starting system change number is in the archive logs; false otherwise.
* @throws SQLException if a database exception occurred
*/
private boolean isStartScnInArchiveLogs(Scn startScn) throws SQLException {
List<LogFile> logs = LogMinerHelper.getLogFilesForOffsetScn(jdbcConnection, startScn, archiveLogRetention, archiveLogOnlyMode, archiveDestinationName);
return logs.stream()
.anyMatch(l -> l.getFirstScn().compareTo(startScn) <= 0 && l.getNextScn().compareTo(startScn) > 0 && l.getType().equals(LogFile.Type.ARCHIVE));
}
@Override
public void commitOffset(Map<String, ?> offset) {
// nothing to do

View File

@ -15,10 +15,12 @@
import static org.fest.assertions.MapAssert.entry;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -33,6 +35,7 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -2116,6 +2119,131 @@ public void shouldIgnoreAllTablesInExcludedSchemas() throws Exception {
}
}
@Test
@FixFor("DBZ-3712")
public void shouldStartWithArchiveLogOnlyModeAndStreamWhenRecordsBecomeAvailable() throws Exception {
TestHelper.dropTable(connection, "dbz3712");
try {
connection.execute("CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))");
TestHelper.streamTable(connection, "dbz3712");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// At this point the connector is new and should not emit any records as the SCN offset
// obtained from the snapshot is in the redo logs.
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
assertNoRecordsToConsume();
// We will insert a new record but this record won't be emitted right away and will
// require that a log switch happen so that it can be emitted.
connection.execute("INSERT INTO dbz3712 (id,data) values (1, 'Test')");
waitForLogSwitchOrForceOneAfterTimeout();
// We should now be able to consume a record
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
}
finally {
TestHelper.dropTable(connection, "dbz3712");
}
}
@Test
@FixFor("DBZ-3712")
public void shouldPermitChangingToArchiveLogOnlyModeOnExistingConnector() throws Exception {
TestHelper.dropTable(connection, "dbz3712");
try {
connection.execute("CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))");
TestHelper.streamTable(connection, "dbz3712");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// The connector was started with archive.log.only.mode disabled so this record should
// be emitted immediately once its written to the redo logs.
connection.execute("INSERT INTO dbz3712 (id,data) values (1, 'Test1')");
// We should now be able to consume a record
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
// Restart connector using the same offsets but with archive log only mode
stopConnector();
config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// At this point the connector was restarted with archive log only mode. The SCN offset
// was previously in the redo logs and may likely not be in the archive logs on start so
// we'll give the connector a moment and verify it has no records to consume.
waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
assertNoRecordsToConsume();
// Insert a new record
// This should not be picked up until after a log switch
connection.execute("INSERT INTO dbz3712 (id,data) values (2, 'Test2')");
waitForLogSwitchOrForceOneAfterTimeout();
// We should now be able to consume a record
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
// Insert a new record
// This should not be picked up until after a log switch
connection.execute("INSERT INTO dbz3712 (id,data) values (3, 'Test2')");
waitForLogSwitchOrForceOneAfterTimeout();
// We should now be able to consume a record
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
}
finally {
TestHelper.dropTable(connection, "dbz3712");
}
}
private void waitForLogSwitchOrForceOneAfterTimeout() throws SQLException {
List<BigInteger> sequences = TestHelper.getCurrentRedoLogSequences();
try {
Awaitility.await()
.pollInterval(Duration.of(5, ChronoUnit.SECONDS))
.atMost(Duration.of(20, ChronoUnit.SECONDS))
.until(() -> {
if (TestHelper.getCurrentRedoLogSequences().equals(sequences)) {
assertNoRecordsToConsume();
return false;
}
// Oracle triggered its on log switch
return true;
});
// In this use case Oracle triggered its own log switch
// We don't need to trigger one on our own.
}
catch (ConditionTimeoutException e) {
// expected if Oracle doesn't trigger its own log switch
TestHelper.forceLogfileSwitch();
}
}
private String generateAlphaNumericStringColumn(int size) {
final String alphaNumericString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
final StringBuilder sb = new StringBuilder(size);

View File

@ -5,8 +5,11 @@
*/
package io.debezium.connector.oracle.util;
import java.math.BigInteger;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import io.debezium.config.Configuration;
@ -415,4 +418,16 @@ public static void dropAllTables() {
throw new RuntimeException("Failed to clean database");
}
}
public static List<BigInteger> getCurrentRedoLogSequences() throws SQLException {
try (OracleConnection connection = adminConnection()) {
return connection.queryAndMap("SELECT SEQUENCE# FROM V$LOG WHERE STATUS = 'CURRENT'", rs -> {
List<BigInteger> sequences = new ArrayList<>();
while (rs.next()) {
sequences.add(new BigInteger(rs.getString(1)));
}
return sequences;
});
}
}
}