diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogFileCollector.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogFileCollector.java index cf4c0307a..de95d6804 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogFileCollector.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogFileCollector.java @@ -265,9 +265,12 @@ private boolean isOpenThreadConsistent(RedoThread thread, Scn startScn, List log.isScnInLogFileRange(startScn))) { - logException(String.format("Redo Thread %d is inconsistent; does not have a log that contains scn %s", threadId, startScn)); - return false; + if (startScn.compareTo(thread.getCheckpointScn()) >= 0) { + // Read position is after last checkpoint for open redo-thread, we should have a log with SCN + if (threadLogs.stream().noneMatch(log -> log.isScnInLogFileRange(startScn))) { + logException(String.format("Redo Thread %d is inconsistent; does not have a log that contains scn %s", threadId, startScn)); + return false; + } } final Optional missingSequence = getFirstLogMissingSequence(threadLogs); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java index 42446f9b6..bde8b1c25 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java @@ -1787,6 +1787,7 @@ public void testOpenRedoThreadSwitchDoesNotMissArchiveLogIfNotYetAvailable() thr .enabled("PUBLIC") .enabledScn(Scn.valueOf(12345)) .status("OPEN") + .checkpointScn(Scn.valueOf(1234567240)) .lastRedoScn(Scn.valueOf(1234567890)) .lastRedoSequenceNumber(1065L) .build() @@ -1810,6 +1811,104 @@ public void testOpenRedoThreadSwitchDoesNotMissArchiveLogIfNotYetAvailable() thr assertThat(collector.isLogFileListConsistent(Scn.valueOf(1234567250), files, redoThreadState)).isTrue(); } + @Test + @FixFor("DBZ-8162") + public void testOpenRedoThreadAfterBeingDoesNotBecomeInconsistent() throws Exception { + // In this scenario, a node in the RAC cluster is taken down for a period of time duration a + // maintenance window. The connector and environment continue onward for some time and the + // DBA brings the node back online, at which point the open log check for SCN should not fail. + RedoThreadState redoThreadState = RedoThreadState.builder() + .thread() + .threadId(1) + .status("OPEN") + .enabled("PUBLIC") + .enabledScn(Scn.valueOf(1918873)) + .checkpointScn(Scn.valueOf(14338976116101L)) + .lastRedoScn(Scn.valueOf(14339038344586L)) + .lastRedoSequenceNumber(164492L) + .build() + .thread() + .threadId(2) + .status("OPEN") + .enabled("PUBLIC") + .enabledScn(Scn.valueOf(2028329)) + .checkpointScn(Scn.valueOf(14338976116789L)) + .lastRedoScn(Scn.valueOf(14339038344484L)) + .lastRedoSequenceNumber(138697L) + .build() + .build(); + + final List files = new ArrayList<>(); + files.add(createArchiveLog("thread_2_seq_138697.5633.1177608211", 14338976116789L, 14339038344652L, 138697, 2)); + files.add(createRedoLog("group_5.273.1086113793", 14339038344652L, 138698, 2)); + files.add(createRedoLog("group_2.270.1086113793", 14339038343718L, 164492, 1)); + + final Configuration config = getDefaultConfig().build(); + final OracleConnection connection = getOracleConnectionMock(redoThreadState); + + // Logs should be consistent + LogFileCollector collector = setCollectorLogFiles(getLogFileCollector(config, connection), files); + assertThat(collector.isLogFileListConsistent(Scn.valueOf(14339038344480L), files, redoThreadState)).isTrue(); + + // Transition Node 2 to CLOSED + redoThreadState = RedoThreadState.builder() + .thread() + .threadId(1) + .status("OPEN") + .enabled("PUBLIC") + .enabledScn(Scn.valueOf(1918873)) + .checkpointScn(Scn.valueOf(14339038343718L)) + .lastRedoScn(Scn.valueOf(14339070179636L)) + .lastRedoSequenceNumber(164492L) + .build() + .thread() + .threadId(2) + .status("CLOSED") + .enabled("PUBLIC") + .enabledScn(Scn.valueOf(2028329)) + .checkpointScn(Scn.valueOf(14339070179724L)) + .lastRedoScn(Scn.valueOf(14339070179614L)) + .lastRedoSequenceNumber(138698L) + .build() + .build(); + + files.clear(); + files.add(createArchiveLog("thread_2_seq_138698.6821.1177608599", 14339038344652L, 14339070179726L, 138698, 2)); + files.add(createRedoLog("group_2.270.1086113793", 14339038343718L, 164492, 1)); + + collector = setCollectorLogFiles(getLogFileCollector(config, connection), files); + assertThat(collector.isLogFileListConsistent(Scn.valueOf(14339070176922L), files, redoThreadState)).isTrue(); + + // Re-open node 2 + redoThreadState = RedoThreadState.builder() + .thread() + .threadId(1) + .status("OPEN") + .enabled("PUBLIC") + .enabledScn(Scn.valueOf(1918873)) + .checkpointScn(Scn.valueOf(14339038343718L)) + .lastRedoScn(Scn.valueOf(14339074283009L)) + .lastRedoSequenceNumber(164492L) + .build() + .thread() + .threadId(2) + .status("OPEN") + .enabled("PUBLIC") + .enabledScn(Scn.valueOf(2028329)) + .checkpointScn(Scn.valueOf(14339074283022L)) + .lastRedoScn(Scn.valueOf(14339070179614L)) + .lastRedoSequenceNumber(138698L) + .build() + .build(); + + files.clear(); + files.add(createRedoLog("group_6.274.1086113795", 14339074283016L, 138699, 2)); + files.add(createRedoLog("group_2.270.1086113793", 14339038343718L, 164492, 1)); + + collector = setCollectorLogFiles(getLogFileCollector(config, connection), files); + assertThat(collector.isLogFileListConsistent(Scn.valueOf(14339074282971L), files, redoThreadState)).isTrue(); + } + private static LogFile createRedoLog(String name, long startScn, int sequence, int threadId) { return createRedoLog(name, startScn, Long.MAX_VALUE, sequence, threadId); }