DBZ-4737 Fix error when rewinding to oldest archive log SCN

In LogMinerStreamingChangeEventSource, the `startScn` variable is used
as an exclusive lower bound for log mining sessions. The meaning of this
variable was a little muddled, sometimes (incorrectly) being considered
an inclusive lower bound. This PR aims to rectify that and avoid future
confusion on the matter.
This commit is contained in:
Dominique Chanet 2022-02-11 15:42:10 +01:00 committed by Chris Cranford
parent 943618227d
commit 26b53bbf22
2 changed files with 15 additions and 7 deletions

View File

@ -25,6 +25,11 @@ public class Scn implements Comparable<Scn> {
*/ */
public static final Scn NULL = new Scn(null); public static final Scn NULL = new Scn(null);
/**
* Represents an Scn with value 1, useful for playing with inclusive/exclusive query boundaries.
*/
public static final Scn ONE = new Scn(BigInteger.valueOf(1));
private final BigInteger scn; private final BigInteger scn;
public Scn(BigInteger scn) { public Scn(BigInteger scn) {

View File

@ -75,7 +75,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final String archiveDestinationName; private final String archiveDestinationName;
private final int logFileQueryMaxRetries; private final int logFileQueryMaxRetries;
private Scn startScn; private Scn startScn; // startScn is the **exclusive** lower bound for mining
private Scn endScn; private Scn endScn;
private Scn snapshotScn; private Scn snapshotScn;
private List<BigInteger> currentRedoLogSequences; private List<BigInteger> currentRedoLogSequences;
@ -128,7 +128,8 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
} }
try (LogWriterFlushStrategy flushStrategy = resolveFlushStrategy()) { try (LogWriterFlushStrategy flushStrategy = resolveFlushStrategy()) {
if (!isContinuousMining && startScn.compareTo(firstScn) < 0) { if (!isContinuousMining && startScn.compareTo(firstScn.subtract(Scn.ONE)) < 0) {
// startScn is the exclusive lower bound, so must be >= (firstScn - 1)
throw new DebeziumException( throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot."); "Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
} }
@ -248,14 +249,14 @@ else if (scn.compareTo(minScn) < 0) {
Scn originalCommitScn = offsetContext.getCommitScn(); Scn originalCommitScn = offsetContext.getCommitScn();
if (originalCommitScn == null || originalCommitScn.compareTo(snapshotScn) < 0) { if (originalCommitScn == null || originalCommitScn.compareTo(snapshotScn) < 0) {
LOGGER.info("Setting commit SCN to {} (snapshot SCN - 1) to ensure we don't double-emit events from pre-snapshot transactions.", LOGGER.info("Setting commit SCN to {} (snapshot SCN - 1) to ensure we don't double-emit events from pre-snapshot transactions.",
snapshotScn.subtract(Scn.valueOf(1))); snapshotScn.subtract(Scn.ONE));
offsetContext.setCommitScn(snapshotScn.subtract(Scn.valueOf(1))); offsetContext.setCommitScn(snapshotScn.subtract(Scn.ONE));
} }
// set start SCN to minScn // set start SCN to minScn
if (minScn.compareTo(startScn) < 0) { if (minScn.compareTo(startScn) < 0) {
LOGGER.info("Resetting start SCN from {} (snapshot SCN) to {} (start of oldest complete pending transaction)", startScn, minScn); LOGGER.info("Resetting start SCN from {} (snapshot SCN) to {} (start of oldest complete pending transaction)", startScn, minScn);
startScn = minScn.subtract(Scn.valueOf(1)); startScn = minScn.subtract(Scn.ONE);
} }
} }
offsetContext.setScn(startScn); offsetContext.setScn(startScn);
@ -470,7 +471,7 @@ private OffsetDateTime getDatabaseSystemTime(OracleConnection connection) throws
* this call to prepare DDL tracking state for the upcoming LogMiner view query. * this call to prepare DDL tracking state for the upcoming LogMiner view query.
* *
* @param connection database connection, should not be {@code null} * @param connection database connection, should not be {@code null}
* @param startScn mining session's starting system change number (inclusive), should not be {@code null} * @param startScn mining session's starting system change number (exclusive), should not be {@code null}
* @param endScn mining session's ending system change number (inclusive), can be {@code null} * @param endScn mining session's ending system change number (inclusive), can be {@code null}
* @throws SQLException if mining session failed to start * @throws SQLException if mining session failed to start
*/ */
@ -479,7 +480,9 @@ public void startMiningSession(OracleConnection connection, Scn startScn, Scn en
startScn, endScn, strategy, isContinuousMining); startScn, endScn, strategy, isContinuousMining);
try { try {
Instant start = Instant.now(); Instant start = Instant.now();
connection.executeWithoutCommitting(SqlUtils.startLogMinerStatement(startScn, endScn, strategy, isContinuousMining)); // NOTE: we treat startSCN as the _exclusive_ lower bound for mining,
// whereas START_LOGMNR takes an _inclusive_ lower bound. Hence the increment.
connection.executeWithoutCommitting(SqlUtils.startLogMinerStatement(startScn.add(Scn.ONE), endScn, strategy, isContinuousMining));
streamingMetrics.addCurrentMiningSessionStart(Duration.between(start, Instant.now())); streamingMetrics.addCurrentMiningSessionStart(Duration.between(start, Instant.now()));
} }
catch (SQLException e) { catch (SQLException e) {