DBZ-6355 Change to the last offset

This commit is contained in:
Sergey Eizner 2023-04-19 10:18:51 -05:00 committed by Chris Cranford
parent baa72f3f51
commit cac5485176
4 changed files with 41 additions and 12 deletions

View File

@ -174,13 +174,14 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
public static final Field LOG_MINING_TRANSACTION_RETENTION = Field.create("log.mining.transaction.retention.hours")
.withDisplayName("Log Mining long running transaction retention")
.withType(Type.LONG)
.withType(Type.DOUBLE)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(0)
.withValidation(Field::isNonNegativeInteger)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 18))
.withDescription("Hours to keep long running transactions in transaction buffer between log mining sessions. By default, all transactions are retained.");
.withDescription("Hours to keep long running transactions in transaction buffer between log mining " +
"sessions. By default, all transactions are retained.");
public static final Field RAC_NODES = Field.create("rac.nodes")
.withDisplayName("Oracle RAC nodes")
@ -632,7 +633,7 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningSleepTimeMax = Duration.ofMillis(config.getInteger(LOG_MINING_SLEEP_TIME_MAX_MS));
this.logMiningSleepTimeDefault = Duration.ofMillis(config.getInteger(LOG_MINING_SLEEP_TIME_DEFAULT_MS));
this.logMiningSleepTimeIncrement = Duration.ofMillis(config.getInteger(LOG_MINING_SLEEP_TIME_INCREMENT_MS));
this.logMiningTransactionRetention = Duration.ofHours(config.getInteger(LOG_MINING_TRANSACTION_RETENTION));
this.logMiningTransactionRetention = Duration.ofMinutes((long) (config.getDouble(LOG_MINING_TRANSACTION_RETENTION) * 60));
this.archiveLogOnlyMode = config.getBoolean(LOG_MINING_ARCHIVE_LOG_ONLY_MODE);
this.logMiningUsernameExcludes = Strings.setOf(config.getString(LOG_MINING_USERNAME_EXCLUDE_LIST), String::new);
this.logMiningArchiveDestinationName = config.getString(LOG_MINING_ARCHIVE_DESTINATION_NAME);

View File

@ -225,6 +225,15 @@ static String deleteLogFileStatement(String fileName) {
return "BEGIN SYS.DBMS_LOGMNR.REMOVE_LOGFILE(LOGFILENAME => '" + fileName + "');END;";
}
public static String getScnByTimeDeltaQuery(Scn scn, Duration duration) {
if (scn == null) {
return null;
}
return "select timestamp_to_scn(CAST(scn_to_timestamp(" + scn.toString() + ") as date)" +
" - INTERVAL '" + duration.toMinutes() + "' MINUTE) from dual";
}
/**
* This method return query which converts given SCN in days and deduct from the current day
*/

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.oracle.logminer.processor.memory;
import java.math.BigInteger;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -138,7 +139,7 @@ public void close() throws Exception {
public void abandonTransactions(Duration retention) throws InterruptedException {
if (!Duration.ZERO.equals(retention)) {
final Scn offsetScn = offsetContext.getScn();
Optional<Scn> lastScnToAbandonTransactions = getLastScnToAbandon(jdbcConnection, offsetScn, retention);
Optional<Scn> lastScnToAbandonTransactions = getLastScnToAbandon(jdbcConnection, retention);
if (lastScnToAbandonTransactions.isPresent()) {
Scn thresholdScn = lastScnToAbandonTransactions.get();
LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn);
@ -320,22 +321,23 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
* The criteria is do not let the offset SCN expire from archives older the specified retention hours.
*
* @param connection database connection, should not be {@code null}
* @param offsetScn offset system change number, should not be {@code null}
* @param retention duration to tolerate long running transactions before being abandoned, must not be {@code null}
* @return an optional system change number as the watermark for transaction buffer abandonment
*/
protected Optional<Scn> getLastScnToAbandon(OracleConnection connection, Scn offsetScn, Duration retention) {
protected Optional<Scn> getLastScnToAbandon(OracleConnection connection, Duration retention) {
try {
Float diffInDays = connection.singleOptionalValue(SqlUtils.diffInDaysQuery(offsetScn), rs -> rs.getFloat(1));
if (diffInDays != null && (diffInDays * 24) > retention.toHours()) {
return Optional.of(offsetScn);
}
if (getLastProcessedScn().isNull()) {
return Optional.empty();
}
BigInteger scnToAbandon = connection.singleOptionalValue(
SqlUtils.getScnByTimeDeltaQuery(getLastProcessedScn(), retention),
rs -> rs.getBigDecimal(1).toBigInteger());
return Optional.of(new Scn(scnToAbandon));
}
catch (SQLException e) {
LOGGER.error("Cannot calculate days difference for transaction abandonment", e);
LOGGER.error("Cannot fetch SCN by given duration to calculate SCN to abandon", e);
metrics.incrementErrorCount();
return Optional.of(offsetScn);
return Optional.empty();
}
}

View File

@ -26,6 +26,7 @@
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.DoubleSupplier;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
@ -1242,6 +1243,18 @@ default Integer getInteger(String key, IntSupplier defaultValueSupplier) {
return defaultValueSupplier != null ? defaultValueSupplier.getAsInt() : null;
}
default Double getDouble(String key, DoubleSupplier defaultValueSupplier) {
String value = getString(key);
if (value != null) {
try {
return Double.valueOf(value);
}
catch (NumberFormatException e) {
}
}
return defaultValueSupplier != null ? defaultValueSupplier.getAsDouble() : null;
}
/**
* Get the long value associated with the given key, using the given supplier to obtain a default value if there is no such
* key-value pair.
@ -1316,6 +1329,10 @@ default int getInteger(Field field) {
return getInteger(field.name(), () -> Integer.valueOf(field.defaultValueAsString())).intValue();
}
default Double getDouble(Field field) {
return getDouble(field.name(), () -> Double.valueOf(field.defaultValueAsString())).doubleValue();
}
/**
* Get the long value associated with the given field, returning the field's default value if there is no such
* key-value pair.