diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java index 9107e09b4..42446f9b6 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogFileCollectorTest.java @@ -201,14 +201,21 @@ public void testOracleRacWithRealDataRedoThreadsWithIndependentSequenceRanges() files.add(createRedoLog("group_303.8411.1106167689", 10403072997229L, 56770, 3)); files.add(createArchiveLog("thread_4_seq_62925.29497.1152789389", 10403069930249L, 10403071899909L, 62925, 4)); files.add(createRedoLog("group_401.8418.1106167699", 10403071899909L, 62926, 4)); - files.add(createRedoLog("group_201.8400.1106167673", 10326919867728L, 10326919867733L, 78034, 2, false)); - files.add(createRedoLog("group_202.8401.1106167675", 10326919867733L, 10326919910394L, 78035, 2, false)); - files.add(createRedoLog("group_203.8402.1106167677", 10326919910394L, 10326919923665L, 78036, 2, false)); - files.add(createRedoLog("group_101.8391.1106167659", 10326919867747L, 10326919867788L, 83112, 1, false)); - files.add(createRedoLog("group_102.8392.1106167661", 10326919867788L, 10326919910390L, 83113, 1, false)); - files.add(createRedoLog("group_103.8393.1106167663", 10326919910390L, 10326919923663L, 83114, 1, false)); + // These logs would have not been pulled since they don't contain the read position or come after read position + // files.add(createRedoLog("group_201.8400.1106167673", 10326919867728L, 10326919867733L, 78034, 2, false)); + // files.add(createRedoLog("group_202.8401.1106167675", 10326919867733L, 10326919910394L, 78035, 2, false)); + // files.add(createRedoLog("group_203.8402.1106167677", 10326919910394L, 10326919923665L, 78036, 2, false)); + // files.add(createRedoLog("group_101.8391.1106167659", 10326919867747L, 10326919867788L, 83112, 1, false)); + // files.add(createRedoLog("group_102.8392.1106167661", 10326919867788L, 10326919910390L, 83113, 1, false)); + // files.add(createRedoLog("group_103.8393.1106167663", 10326919910390L, 10326919923663L, 83114, 1, false)); - final RedoThreadState state = getFourThreadOpenState(Scn.valueOf(10326919867700L), Scn.valueOf(10326919867700L)); + RedoThreadState.Builder builder = RedoThreadState.builder(); + builder = makeClosedRedoThreadState(builder.thread(), 1, Scn.valueOf(10326919860000L), Scn.valueOf(10326919923663L)).build(); + builder = makeClosedRedoThreadState(builder.thread(), 2, Scn.valueOf(10326919860000L), Scn.valueOf(10326919923665L)).build(); + builder = makeOpenRedoThreadState(builder.thread(), 3, Scn.valueOf(10326919867700L), Scn.valueOf(10326919867700L)).build(); + builder = makeOpenRedoThreadState(builder.thread(), 4, Scn.valueOf(10326919867700L), Scn.valueOf(10326919867700L)).build(); + + final RedoThreadState state = builder.build(); assertThat(getLogFileCollector(state).isLogFileListConsistent(Scn.valueOf("10403071210665"), files, state)).isTrue(); } @@ -1462,7 +1469,7 @@ public void testLogsWithRegularScns() throws Exception { final OracleConnection connection = getOracleConnectionMock(redoThreadState, files); final LogFileCollector collector = getLogFileCollector(config, connection); - final List result = collector.getLogs(Scn.valueOf(10)); + final List result = collector.getLogs(Scn.valueOf(103401)); assertThat(result).hasSize(2); assertThat(getLogFileWithName(result, "logfile1").getNextScn()).isEqualTo(Scn.valueOf(103700)); assertThat(getLogFileWithName(result, "logfile2").getNextScn()).isEqualTo(Scn.MAX); @@ -1501,7 +1508,7 @@ public void testNullsHandledAsMaxScn() throws Exception { final OracleConnection connection = getOracleConnectionMock(redoThreadState, files); final LogFileCollector collector = getLogFileCollector(config, connection); - final List result = collector.getLogs(Scn.valueOf(600)); + final List result = collector.getLogs(Scn.valueOf(103301)); assertThat(result).hasSize(3); assertThat(getLogFileWithName(result, "logfile2").getNextScn()).isEqualTo(Scn.MAX); } @@ -1520,7 +1527,7 @@ public void testCanHandleMaxScn() throws Exception { final OracleConnection connection = getOracleConnectionMock(redoThreadState, files); final LogFileCollector collector = getLogFileCollector(config, connection); - final List result = collector.getLogs(Scn.valueOf(600)); + final List result = collector.getLogs(Scn.valueOf(103301)); assertThat(result).hasSize(3); assertThat(getLogFileWithName(result, "logfile2").getNextScn()).isEqualTo(Scn.MAX); } @@ -1541,7 +1548,7 @@ public void testCanHandleVeryLargeScnValuesInNonCurrentRedoLog() throws Exceptio final OracleConnection connection = getOracleConnectionMock(redoThreadState, files); final LogFileCollector collector = getLogFileCollector(config, connection); - final List result = collector.getLogs(Scn.valueOf(600)); + final List result = collector.getLogs(Scn.valueOf(103301)); assertThat(result).hasSize(3); assertThat(getLogFileWithName(result, "logfile2").getNextScn()).isEqualTo(Scn.valueOf(largeScnValue)); } @@ -1956,6 +1963,29 @@ private RedoThread.Builder makeOpenRedoThreadState(RedoThread.Builder builder, .conId(0L); } + private RedoThread.Builder makeClosedRedoThreadState(RedoThread.Builder builder, + int threadId, Scn enabledScn, Scn lastFlushedScn) { + return builder.threadId(threadId) + .status("CLOSED") + .enabled("PUBLIC") + .instanceName("ORCLCDB") + .logGroups(2L) + .openTime(Instant.now().minus(10, ChronoUnit.MINUTES)) + .checkpointTime(Instant.now().minus(5, ChronoUnit.MINUTES)) + .checkpointScn(lastFlushedScn.add(Scn.ONE)) + .currentGroupNumber(1L) + .currentSequenceNumber(1L) + .enabledScn(enabledScn) + .enabledTime(Instant.now().minus(10, ChronoUnit.MINUTES)) + .disabledScn(Scn.valueOf(0)) + .disabledTime(null) + .lastRedoScn(lastFlushedScn) + .lastRedoBlock(1234L) + .lastRedoSequenceNumber(2L) + .lastRedoTime(Instant.now().minus(3, ChronoUnit.SECONDS)) + .conId(0L); + } + // Node 1 - OPEN Sequence 1, started at SCN 500, last SCN 1100, recent checkpoint SCN 1000 // Node 2 - OPEN Sequence 1, started at SCN 500, last SCN 1100, recent checkpoint SCN 1000 private RedoThreadState initialMultiNodeRedoThreadState() {