DBZ-6660 Introduce internal.log.mining.max.scn.deviation.ms

We hypothesize that there could be a situation where we may be mining precisely
around the CURRENT_SCN and this may lead to situations where LGWR may not have
flushed all records for the same SCN before being mined by the connector.
This commit is contained in:
Chris Cranford 2023-07-14 12:41:28 -04:00 committed by Jiri Pechanec
parent 877a19e278
commit de8cf36b2c
5 changed files with 222 additions and 1 deletions

View File

@ -10,6 +10,7 @@
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
@ -484,6 +485,26 @@ public Optional<Instant> getScnToTimestamp(Scn scn) throws SQLException {
}
}
public Scn getScnAdjustedByTime(Scn scn, Duration adjustment) throws SQLException {
try {
final String result = prepareQueryAndMap(
"SELECT timestamp_to_scn(scn_to_timestamp(?) - (? / 86400000)) FROM DUAL",
st -> {
st.setString(1, scn.toString());
st.setLong(2, adjustment.toMillis());
},
singleResultMapper(rs -> rs.getString(1), "Failed to get adjusted SCN from: " + scn));
return Scn.valueOf(result);
}
catch (SQLException e) {
if (e.getErrorCode() == 8181 || e.getErrorCode() == 8180) {
// This happens when the SCN provided is outside the flashback/undo area
return Scn.NULL;
}
throw e;
}
}
@Override
protected ColumnEditor overrideColumn(ColumnEditor column) {
// This allows the column state to be overridden before default-value resolution so that the

View File

@ -535,6 +535,21 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
"The maximum number of records that should be loaded into memory while streaming. A value of '0' uses the default JDBC fetch size, defaults to '2000'.")
.withDefault(DEFAULT_QUERY_FETCH_SIZE);
public static final Field LOG_MINING_MAX_SCN_DEVIATION_MS = Field.createInternal("log.mining.max.scn.deviation.ms")
.withDisplayName("Allows applying a time-based deviation to the max mining scn")
.withType(Type.LONG)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(0)
.withValidation(Field::isNonNegativeLong)
.withDescription("By default, LogMiner will apply no deviation, meaning that the connector can mine up to the CURRENT_SCN. " +
"There are situations where this could be problematic if perhaps when asynchronous IO operations are at play. " +
"By applying a time-based deviation, for example 3000, the connector will only mine up the SCN that is a result of " +
"the formula of TIMESTAMP_TO_SCN(SCN_TO_TIMESTAMP(CURRENT_SCN)-(3000/86400000)). If this SCN is not available, the " +
"connector will log a warning and proceed to use the CURRENT_SCN or previously calculated upper SCN regardless. " +
"NOTE: This option is internal and should not be used for general use. Using this option will create a net latency " +
"on change events increased by the deviation value specified.");
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("Oracle")
.excluding(
@ -596,7 +611,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_READ_ONLY,
LOG_MINING_FLUSH_TABLE_NAME,
LOG_MINING_QUERY_FILTER_MODE,
LOG_MINING_RESTART_CONNECTION)
LOG_MINING_RESTART_CONNECTION,
LOG_MINING_MAX_SCN_DEVIATION_MS)
.events(SOURCE_INFO_STRUCT_MAKER)
.create();
@ -660,6 +676,7 @@ public static ConfigDef configDef() {
private final String logMiningFlushTableName;
private final LogMiningQueryFilterMode logMiningQueryFilterMode;
private final Boolean logMiningRestartConnection;
private final Duration logMiningMaxScnDeviation;
public OracleConnectorConfig(Configuration config) {
super(
@ -719,6 +736,7 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningFlushTableName = config.getString(LOG_MINING_FLUSH_TABLE_NAME);
this.logMiningQueryFilterMode = LogMiningQueryFilterMode.parse(config.getString(LOG_MINING_QUERY_FILTER_MODE));
this.logMiningRestartConnection = config.getBoolean(LOG_MINING_RESTART_CONNECTION);
this.logMiningMaxScnDeviation = Duration.ofMillis(config.getLong(LOG_MINING_MAX_SCN_DEVIATION_MS));
}
private static String toUpperCase(String property) {
@ -1672,6 +1690,16 @@ public boolean isLogMiningRestartConnection() {
return logMiningRestartConnection;
}
/**
* Returns the deviation in milliseconds that should be applied to the end SCN calculation.
* If this is {@code 0}, then there is no deviation applied.
*
* @return the deviation duration.
*/
public Duration getLogMiningMaxScnDeviation() {
return logMiningMaxScnDeviation;
}
@Override
public String getConnectorName() {
return Module.name();

View File

@ -193,6 +193,16 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
continue;
}
final Duration deviation = connectorConfig.getLogMiningMaxScnDeviation();
if (!deviation.isZero()) {
Optional<Scn> deviatedScn = calculateDeviatedEndScn(startScn, endScn, deviation);
if (deviatedScn.isEmpty()) {
pauseBetweenMiningSessions();
continue;
}
endScn = deviatedScn.get();
}
flushStrategy.flush(jdbcConnection.getCurrentScn());
boolean restartRequired = false;
@ -736,6 +746,53 @@ private Scn calculateEndScn(OracleConnection connection, Scn startScn, Scn prevE
}
}
/**
* Calculates the deviated end scn based on the scn range and deviation.
*
* @param lowerboundsScn the mining range's lower bounds
* @param upperboundsScn the mining range's upper bounds
* @param deviation the time deviation
* @return an optional that contains the deviated scn or empty if the operation should be performed again
*/
private Optional<Scn> calculateDeviatedEndScn(Scn lowerboundsScn, Scn upperboundsScn, Duration deviation) {
final Optional<Scn> calculatedDeviatedEndScn = getDeviatedMaxScn(upperboundsScn, deviation);
if (calculatedDeviatedEndScn.isEmpty() || calculatedDeviatedEndScn.get().isNull()) {
// This happens only if the deviation calculation is outside the flashback/undo area or an exception was thrown.
// In this case we have no choice but to use the upper bounds as a fallback.
LOGGER.warn("Mining session end SCN deviation calculation is outside undo space, using upperbounds {}. If this continues, " +
"consider lowering the value of the '{}' configuration property.", upperboundsScn,
OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS.name());
return Optional.of(upperboundsScn);
}
else if (calculatedDeviatedEndScn.get().compareTo(lowerboundsScn) <= 0) {
// This should also force the outer loop to recall this method again.
LOGGER.debug("Mining session end SCN deviation as {}, outside of mining range, recalculating.", calculatedDeviatedEndScn.get());
return Optional.empty();
}
else {
// Calculated SCN is after lower bounds and within flashback/undo area, safe to return.
return calculatedDeviatedEndScn;
}
}
/**
* Uses the provided Upperbound SCN and deviation to calculate an SCN that happened in the past at a
* time based on Oracle's {@code TIMESTAMP_TO_SCN} and {@code SCN_TO_TIMESTAMP} functions.
*
* @param upperboundsScn the upper bound system change number, should not be {@code null}
* @param deviation the time deviation to be applied, should not be {@code null}
* @return the newly calculated Scn
*/
private Optional<Scn> getDeviatedMaxScn(Scn upperboundsScn, Duration deviation) {
try {
return Optional.of(jdbcConnection.getScnAdjustedByTime(upperboundsScn, deviation));
}
catch (SQLException e) {
LOGGER.warn("Failed to calculate deviated max SCN value from {}.", upperboundsScn);
return Optional.empty();
}
}
/**
* Checks and validates the database's supplemental logging configuration as well as the lengths of the
* table and column names that are part of the database schema.

View File

@ -35,7 +35,10 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.management.JMException;
@ -5452,6 +5455,110 @@ public void shouldNotThrowConcurrentModificationExceptionWhenDispatchingSchemaCh
}
}
@Test
@FixFor("DBZ-6660")
@SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.LOGMINER)
public void shouldPauseAndWaitForDeviationCalculationIfBeforeMiningRange() throws Exception {
try {
TestHelper.dropTable(connection, "dbz6660");
connection.execute("CREATE TABLE dbz6660 (id number(9,0), data varchar2(50), primary key(id))");
TestHelper.streamTable(connection, "dbz6660");
final Long deviationMs = 10000L;
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6660")
.with(OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS, deviationMs.toString())
.build();
final LogInterceptor sourceLogging = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
sourceLogging.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
final LogInterceptor processorLogging = new LogInterceptor(AbstractLogMinerEventProcessor.class);
processorLogging.setLoggerLevel(AbstractLogMinerEventProcessor.class, Level.DEBUG);
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// This should be printed at the start of the streaming session while the back-fill is prepared.
// Based on the time given, this could be printed several times.
Awaitility.await().atMost(Duration.ofSeconds(60))
.until(() -> sourceLogging.containsMessage("outside of mining range, recalculating."));
// Assert that every lag log entry is at least 10s behind due to deviation.
try {
final Pattern pattern = Pattern.compile("Lag: ([0-9]+)");
final AtomicInteger id = new AtomicInteger(1);
Awaitility.await()
.pollInterval(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(60)).until(() -> {
// Provide some dummy captured data periodically
connection.execute("INSERT INTO dbz6660 values (" + id.getAndIncrement() + ", 'data')");
final List<String> entries = processorLogging.getLogEntriesThatContainsMessage("Processed in ");
for (String entry : entries) {
final Matcher matcher = pattern.matcher(entry);
if (matcher.matches()) {
assertThat(Long.valueOf(matcher.group(1))).isGreaterThan(deviationMs);
}
}
return false;
});
}
catch (ConditionTimeoutException e) {
// ignored
}
// Just concerned that every iteration has lag greater than deviation.
stopConnector();
}
finally {
TestHelper.dropTable(connection, "dbz6660");
}
}
@Test
@FixFor("DBZ-6660")
@SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.LOGMINER)
public void shouldUseEndScnIfDeviationProducesScnOutsideOfUndoRetention() throws Exception {
try {
TestHelper.dropTable(connection, "dbz6660");
connection.execute("CREATE TABLE dbz6660 (id number(9,0), data varchar2(50), primary key(id))");
TestHelper.streamTable(connection, "dbz6660");
// This is effectively 166666.667 minutes = 115.74 days
// No Oracle instance should have undo space this large :)
final Long deviationMs = 10000000000L;
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6660")
.with(OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS, deviationMs.toString())
.build();
final LogInterceptor sourceLogging = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
sourceLogging.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// This should be printed at the start of the streaming session while the back-fill is prepared.
// Based on the time given, this could be printed several times.
Awaitility.await().atMost(Duration.ofSeconds(60))
.until(() -> sourceLogging.containsMessage("outside undo space, using upperbounds"));
// Just concerned that every iteration has lag greater than deviation.
stopConnector();
}
finally {
TestHelper.dropTable(connection, "dbz6660");
}
}
private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
try (OracleConnection admin = TestHelper.adminConnection(true)) {
final Scn scn = admin.getCurrentScn();

View File

@ -9,6 +9,7 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;
@ -74,6 +75,13 @@ public void setLoggerLevel(Class<?> loggerClass, Level level) {
logger.setLevel(level);
}
public List<String> getLogEntriesThatContainsMessage(String text) {
return events.stream()
.filter(e -> e.getFormattedMessage().toString().contains(text))
.map(e -> e.getFormattedMessage().toString())
.collect(Collectors.toList());
}
public long countOccurrences(String text) {
return events.stream().filter(e -> e.getMessage().toString().contains(text)).count();
}