DBZ-8055 Consolidate code & fix case where start equals end
This commit is contained in:
parent
8df2677289
commit
45e28dd169
@ -189,7 +189,13 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Instant start = Instant.now();
|
Instant start = Instant.now();
|
||||||
endScn = calculateEndScn(jdbcConnection, startScn, endScn);
|
|
||||||
|
endScn = calculateUpperBounds(startScn, endScn);
|
||||||
|
if (endScn.isNull()) {
|
||||||
|
LOGGER.debug("Requested delay of mining by one iteration");
|
||||||
|
pauseBetweenMiningSessions();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// This is a small window where when archive log only mode has completely caught up to the last
|
// This is a small window where when archive log only mode has completely caught up to the last
|
||||||
// record in the archive logs that both the start and end values are identical. In this use
|
// record in the archive logs that both the start and end values are identical. In this use
|
||||||
@ -199,39 +205,6 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Duration deviation = connectorConfig.getLogMiningMaxScnDeviation();
|
|
||||||
if (!deviation.isZero()) {
|
|
||||||
Optional<Scn> deviatedScn = calculateDeviatedEndScn(startScn, endScn, deviation);
|
|
||||||
if (deviatedScn.isEmpty()) {
|
|
||||||
pauseBetweenMiningSessions();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
endScn = deviatedScn.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
final Scn minOpenRedoThreadLastScn = jdbcConnection.getRedoThreadState()
|
|
||||||
.getThreads()
|
|
||||||
.stream()
|
|
||||||
.filter(RedoThread::isOpen)
|
|
||||||
.map(RedoThread::getLastRedoScn)
|
|
||||||
.min(Scn::compareTo)
|
|
||||||
.orElse(Scn.NULL);
|
|
||||||
|
|
||||||
if (!minOpenRedoThreadLastScn.isNull()) {
|
|
||||||
if (minOpenRedoThreadLastScn.compareTo(endScn) < 0) {
|
|
||||||
// There are situations where on first start-up that the startScn may be higher
|
|
||||||
// than the last flushed redo thread SCN, in which case we should delay by one
|
|
||||||
// iteration until the startScn is before the minOpenRedoTheadLastScn
|
|
||||||
if (minOpenRedoThreadLastScn.compareTo(startScn) < 0) {
|
|
||||||
pauseBetweenMiningSessions();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
LOGGER.info("Minimum Redo Log Flush SCN is {}, using range {} to {} instead of {} to {}.", minOpenRedoThreadLastScn, startScn,
|
|
||||||
minOpenRedoThreadLastScn, startScn, endScn);
|
|
||||||
endScn = minOpenRedoThreadLastScn;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
flushStrategy.flush(jdbcConnection.getCurrentScn());
|
flushStrategy.flush(jdbcConnection.getCurrentScn());
|
||||||
|
|
||||||
boolean restartRequired = false;
|
boolean restartRequired = false;
|
||||||
@ -718,85 +691,126 @@ else if (currentSleepTime > connectorConfig.getLogMiningSleepTimeMin().toMillis(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculates the mining session's end system change number.
|
* Calculates the upper mining range SCN boundary
|
||||||
*
|
*
|
||||||
* This calculation is based upon a sliding window algorithm to where if the connector is falling behind,
|
* @param startScn the starting position for the SCN mining range
|
||||||
* the mining session's end point will be calculated based on the batch size and either be increased up
|
* @param prevEndScn the previous iteration's end position for the SCN mining range
|
||||||
* to the maximum batch size or reduced to as low as the minimum batch size.
|
* @return the newly calculated upper boundary, or {@link Scn#NULL} if the loop should pause.
|
||||||
*
|
|
||||||
* Additionally, this method calculates and maintains a sliding algorithm for the sleep time between the
|
|
||||||
* mining sessions, increasing the pause up to the maximum sleep time if the connector is not behind or
|
|
||||||
* is mining too quick and reducing the pause down to the mimum sleep time if the connector has fallen
|
|
||||||
* behind and needs to catch-up faster.
|
|
||||||
*
|
|
||||||
* @param connection database connection, should not be {@code null}
|
|
||||||
* @param startScn upcoming mining session's starting change number, should not be {@code null}
|
|
||||||
* @param prevEndScn last mining session's ending system change number, can be {@code null}
|
|
||||||
* @return the ending system change number to be used for the upcoming mining session, never {@code null}
|
|
||||||
* @throws SQLException if the current max system change number cannot be obtained from the database
|
* @throws SQLException if the current max system change number cannot be obtained from the database
|
||||||
*/
|
*/
|
||||||
private Scn calculateEndScn(OracleConnection connection, Scn startScn, Scn prevEndScn) throws SQLException {
|
private Scn calculateUpperBounds(Scn startScn, Scn prevEndScn) throws SQLException {
|
||||||
Scn currentScn = archiveLogOnlyMode
|
Scn upperBoundsScn = archiveLogOnlyMode ? getMaxArchiveLogScn(currentLogFiles) : jdbcConnection.getCurrentScn();
|
||||||
? getMaxArchiveLogScn(currentLogFiles)
|
streamingMetrics.setCurrentScn(jdbcConnection.getCurrentScn());
|
||||||
: connection.getCurrentScn();
|
|
||||||
streamingMetrics.setCurrentScn(currentScn);
|
|
||||||
|
|
||||||
// Add the current batch size to the starting system change number
|
final Scn batchUpperBoundsScn = startScn.add(Scn.valueOf(streamingMetrics.getBatchSize()));
|
||||||
final Scn currentBatchSizeScn = Scn.valueOf(streamingMetrics.getBatchSize());
|
final Scn defaultBatchSize = Scn.valueOf(connectorConfig.getLogMiningBatchSizeDefault());
|
||||||
Scn topScnToMine = startScn.add(currentBatchSizeScn);
|
|
||||||
|
|
||||||
// Control adjusting batch size
|
// Initially set the upper bounds based on batch size
|
||||||
boolean topMiningScnInFarFuture = false;
|
// The following logic will alter this value as needed based on specific rules
|
||||||
final Scn defaultBatchScn = Scn.valueOf(connectorConfig.getLogMiningBatchSizeDefault());
|
Scn result = batchUpperBoundsScn;
|
||||||
if (topScnToMine.subtract(currentScn).compareTo(defaultBatchScn) > 0) {
|
|
||||||
|
// Check if the batch upper bounds is greater than the current upper bounds
|
||||||
|
// If it isn't, there is no need to update the batch size
|
||||||
|
boolean batchUpperBoundsScnAfterCurrentScn = false;
|
||||||
|
if (batchUpperBoundsScn.subtract(upperBoundsScn).compareTo(defaultBatchSize) > 0) {
|
||||||
|
// Don't update the batch size, batch upper bounds currently large enough
|
||||||
updateBatchSize(false);
|
updateBatchSize(false);
|
||||||
topMiningScnInFarFuture = true;
|
batchUpperBoundsScnAfterCurrentScn = true;
|
||||||
}
|
}
|
||||||
if (currentScn.subtract(topScnToMine).compareTo(defaultBatchScn) > 0) {
|
|
||||||
|
if (upperBoundsScn.subtract(batchUpperBoundsScn).compareTo(defaultBatchSize) > 0) {
|
||||||
|
// Update batch size because the database upper position is greater than the batch size
|
||||||
updateBatchSize(true);
|
updateBatchSize(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Control sleep time to reduce database impact
|
if (upperBoundsScn.compareTo(batchUpperBoundsScn) < 0) {
|
||||||
if (currentScn.compareTo(topScnToMine) < 0) {
|
if (!batchUpperBoundsScnAfterCurrentScn) {
|
||||||
if (!topMiningScnInFarFuture) {
|
|
||||||
updateSleepTime(true);
|
updateSleepTime(true);
|
||||||
}
|
}
|
||||||
LOGGER.debug("Using current SCN {} as end SCN.", currentScn);
|
// Batch upperbounds greater than database max possible read position.
|
||||||
return currentScn;
|
// Cap it at the max possible database read position
|
||||||
|
LOGGER.debug("Batch upper bounds {} exceeds maximum read position, capping to {}.", batchUpperBoundsScn, upperBoundsScn);
|
||||||
|
result = upperBoundsScn;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (prevEndScn != null && topScnToMine.compareTo(prevEndScn) <= 0) {
|
if (prevEndScn != null && batchUpperBoundsScn.compareTo(prevEndScn) <= 0) {
|
||||||
LOGGER.debug("Max batch size too small, using current SCN {} as end SCN.", currentScn);
|
// Batch size is too small, make a large leap and use current SCN
|
||||||
return currentScn;
|
LOGGER.debug("Batch size upper bounds {} too small, using maximum read position {} instead.", batchUpperBoundsScn, upperBoundsScn);
|
||||||
|
result = upperBoundsScn;
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
updateSleepTime(false);
|
updateSleepTime(false);
|
||||||
if (topScnToMine.compareTo(startScn) < 0) {
|
if (batchUpperBoundsScn.compareTo(startScn) < 0) {
|
||||||
LOGGER.debug("Top SCN calculation resulted in end before start SCN, using current SCN {} as end SCN.", currentScn);
|
// Batch SCN calculation resulted in a value before start SCN, fallback to max read position
|
||||||
return currentScn;
|
LOGGER.debug("Batch upper bounds {} is before start SCN {}, fallback to maximum read position {}.", batchUpperBoundsScn, startScn, upperBoundsScn);
|
||||||
|
result = upperBoundsScn;
|
||||||
}
|
}
|
||||||
|
else if (prevEndScn != null) {
|
||||||
if (prevEndScn != null) {
|
final Scn deltaScn = upperBoundsScn.subtract(prevEndScn);
|
||||||
final Scn deltaScn = currentScn.subtract(prevEndScn);
|
|
||||||
if (deltaScn.compareTo(Scn.valueOf(connectorConfig.getLogMiningScnGapDetectionGapSizeMin())) > 0) {
|
if (deltaScn.compareTo(Scn.valueOf(connectorConfig.getLogMiningScnGapDetectionGapSizeMin())) > 0) {
|
||||||
Optional<Instant> prevEndScnTimestamp = connection.getScnToTimestamp(prevEndScn);
|
Optional<Instant> prevEndScnTimestamp = jdbcConnection.getScnToTimestamp(prevEndScn);
|
||||||
if (prevEndScnTimestamp.isPresent()) {
|
if (prevEndScnTimestamp.isPresent()) {
|
||||||
Optional<Instant> currentScnTimestamp = connection.getScnToTimestamp(currentScn);
|
Optional<Instant> upperBoundsScnTimestamp = jdbcConnection.getScnToTimestamp(upperBoundsScn);
|
||||||
if (currentScnTimestamp.isPresent()) {
|
if (upperBoundsScnTimestamp.isPresent()) {
|
||||||
long timeDeltaMs = ChronoUnit.MILLIS.between(prevEndScnTimestamp.get(), currentScnTimestamp.get());
|
long deltaTime = ChronoUnit.MILLIS.between(prevEndScnTimestamp.get(), upperBoundsScnTimestamp.get());
|
||||||
if (timeDeltaMs < connectorConfig.getLogMiningScnGapDetectionTimeIntervalMaxMs()) {
|
if (deltaTime < connectorConfig.getLogMiningScnGapDetectionTimeIntervalMaxMs()) {
|
||||||
LOGGER.warn("Detected possible SCN gap, using current SCN, startSCN {}, prevEndScn {} timestamp {}, current SCN {} timestamp {}.",
|
LOGGER.warn(
|
||||||
startScn,
|
"Detected possible SCN gap, using upperBounds SCN, startSCN {}, prevEndSCN {}, timestamp {}, upperBounds SCN {} timestamp {}.",
|
||||||
prevEndScn, prevEndScnTimestamp.get(), currentScn, currentScnTimestamp.get());
|
startScn, prevEndScn, prevEndScnTimestamp.get(), upperBoundsScn, upperBoundsScnTimestamp.get());
|
||||||
return currentScn;
|
result = upperBoundsScn;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.debug("Using Top SCN calculation {} as end SCN. currentScn {}, startScn {}", topScnToMine, currentScn, startScn);
|
// If the connector is configured with maximum SCN deviation, apply the deviation time.
|
||||||
return topScnToMine;
|
// This rolls the current maximum read SCN position back based on the deviation duration.
|
||||||
|
final Duration deviation = connectorConfig.getLogMiningMaxScnDeviation();
|
||||||
|
if (!deviation.isZero()) {
|
||||||
|
Optional<Scn> deviatedScn = calculateDeviatedEndScn(startScn, result, deviation);
|
||||||
|
if (deviatedScn.isEmpty()) {
|
||||||
|
return Scn.NULL;
|
||||||
}
|
}
|
||||||
|
LOGGER.debug("Adjusted upper bounds {} based on deviation to {}.", result, deviatedScn.get());
|
||||||
|
result = deviatedScn.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the redo thread state and get the minimum flushed SCN across all open redo threads
|
||||||
|
final Scn minOpenRedoThreadLastScn = jdbcConnection.getRedoThreadState()
|
||||||
|
.getThreads()
|
||||||
|
.stream()
|
||||||
|
.filter(RedoThread::isOpen)
|
||||||
|
.map(RedoThread::getLastRedoScn)
|
||||||
|
.min(Scn::compareTo)
|
||||||
|
.orElse(Scn.NULL);
|
||||||
|
|
||||||
|
// If there is a minimum flushed SCN across Open redo threads, and it is before the currently
|
||||||
|
// assigned maximum read position, we should attempt to cap the maximum read position based
|
||||||
|
// on the redo thread data.
|
||||||
|
if (!minOpenRedoThreadLastScn.isNull()) {
|
||||||
|
if (minOpenRedoThreadLastScn.compareTo(result) < 0) {
|
||||||
|
// There are situations where on first start-up that the startScn may be higher
|
||||||
|
// than the last flushed redo thread SCN, in which case we should delay by one
|
||||||
|
// iteration until the startScn is before the minOpenRedoThreadLastScn
|
||||||
|
if (minOpenRedoThreadLastScn.compareTo(startScn) < 0) {
|
||||||
|
return Scn.NULL;
|
||||||
|
}
|
||||||
|
LOGGER.debug("Adjusting upper bounds {} to minimum read thread flush SCN {}.", result, minOpenRedoThreadLastScn);
|
||||||
|
result = minOpenRedoThreadLastScn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.compareTo(startScn) <= 0) {
|
||||||
|
// Final sanity check to prevent ORA-01281: SCN range specified is invalid
|
||||||
|
LOGGER.debug("Final upper bounds {} matches start read position, delay required.", result);
|
||||||
|
return Scn.NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug("Final upper bounds range is {}.", result);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -477,7 +477,9 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr
|
|||||||
|
|
||||||
final T transaction = getAndRemoveTransactionFromCache(transactionId);
|
final T transaction = getAndRemoveTransactionFromCache(transactionId);
|
||||||
if (transaction == null) {
|
if (transaction == null) {
|
||||||
LOGGER.debug("Transaction {} not found in cache, no events to commit.", transactionId);
|
if (!offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
|
||||||
|
LOGGER.debug("Transaction {} not found in cache with SCN {}, no events to commit.", transactionId, row.getScn());
|
||||||
|
}
|
||||||
handleCommitNotFoundInBuffer(row);
|
handleCommitNotFoundInBuffer(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user