diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index 7d00015e7..0bd2a037d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -10,6 +10,7 @@ import java.sql.SQLException; import java.sql.SQLRecoverableException; import java.sql.Statement; +import java.time.Duration; import java.time.Instant; import java.util.HashSet; import java.util.List; @@ -484,6 +485,26 @@ public Optional getScnToTimestamp(Scn scn) throws SQLException { } } + public Scn getScnAdjustedByTime(Scn scn, Duration adjustment) throws SQLException { + try { + final String result = prepareQueryAndMap( + "SELECT timestamp_to_scn(scn_to_timestamp(?) - (? / 86400000)) FROM DUAL", + st -> { + st.setString(1, scn.toString()); + st.setLong(2, adjustment.toMillis()); + }, + singleResultMapper(rs -> rs.getString(1), "Failed to get adjusted SCN from: " + scn)); + return Scn.valueOf(result); + } + catch (SQLException e) { + if (e.getErrorCode() == 8181 || e.getErrorCode() == 8180) { + // This happens when the SCN provided is outside the flashback/undo area + return Scn.NULL; + } + throw e; + } + } + @Override protected ColumnEditor overrideColumn(ColumnEditor column) { // This allows the column state to be overridden before default-value resolution so that the diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 54118ad91..02a0165be 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -535,6 +535,21 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector "The maximum number of records that should be loaded into memory while streaming. A value of '0' uses the default JDBC fetch size, defaults to '2000'.") .withDefault(DEFAULT_QUERY_FETCH_SIZE); + public static final Field LOG_MINING_MAX_SCN_DEVIATION_MS = Field.createInternal("log.mining.max.scn.deviation.ms") + .withDisplayName("Allows applying a time-based deviation to the max mining scn") + .withType(Type.LONG) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDefault(0) + .withValidation(Field::isNonNegativeLong) + .withDescription("By default, LogMiner will apply no deviation, meaning that the connector can mine up to the CURRENT_SCN. " + + "There are situations where this could be problematic if perhaps when asynchronous IO operations are at play. " + + "By applying a time-based deviation, for example 3000, the connector will only mine up the SCN that is a result of " + + "the formula of TIMESTAMP_TO_SCN(SCN_TO_TIMESTAMP(CURRENT_SCN)-(3000/86400000)). If this SCN is not available, the " + + "connector will log a warning and proceed to use the CURRENT_SCN or previously calculated upper SCN regardless. " + + "NOTE: This option is internal and should not be used for general use. Using this option will create a net latency " + + "on change events increased by the deviation value specified."); + private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("Oracle") .excluding( @@ -596,7 +611,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector LOG_MINING_READ_ONLY, LOG_MINING_FLUSH_TABLE_NAME, LOG_MINING_QUERY_FILTER_MODE, - LOG_MINING_RESTART_CONNECTION) + LOG_MINING_RESTART_CONNECTION, + LOG_MINING_MAX_SCN_DEVIATION_MS) .events(SOURCE_INFO_STRUCT_MAKER) .create(); @@ -660,6 +676,7 @@ public static ConfigDef configDef() { private final String logMiningFlushTableName; private final LogMiningQueryFilterMode logMiningQueryFilterMode; private final Boolean logMiningRestartConnection; + private final Duration logMiningMaxScnDeviation; public OracleConnectorConfig(Configuration config) { super( @@ -719,6 +736,7 @@ public OracleConnectorConfig(Configuration config) { this.logMiningFlushTableName = config.getString(LOG_MINING_FLUSH_TABLE_NAME); this.logMiningQueryFilterMode = LogMiningQueryFilterMode.parse(config.getString(LOG_MINING_QUERY_FILTER_MODE)); this.logMiningRestartConnection = config.getBoolean(LOG_MINING_RESTART_CONNECTION); + this.logMiningMaxScnDeviation = Duration.ofMillis(config.getLong(LOG_MINING_MAX_SCN_DEVIATION_MS)); } private static String toUpperCase(String property) { @@ -1672,6 +1690,16 @@ public boolean isLogMiningRestartConnection() { return logMiningRestartConnection; } + /** + * Returns the deviation in milliseconds that should be applied to the end SCN calculation. + * If this is {@code 0}, then there is no deviation applied. + * + * @return the deviation duration. + */ + public Duration getLogMiningMaxScnDeviation() { + return logMiningMaxScnDeviation; + } + @Override public String getConnectorName() { return Module.name(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index e57783574..42c1c1d30 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -193,6 +193,16 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, continue; } + final Duration deviation = connectorConfig.getLogMiningMaxScnDeviation(); + if (!deviation.isZero()) { + Optional deviatedScn = calculateDeviatedEndScn(startScn, endScn, deviation); + if (deviatedScn.isEmpty()) { + pauseBetweenMiningSessions(); + continue; + } + endScn = deviatedScn.get(); + } + flushStrategy.flush(jdbcConnection.getCurrentScn()); boolean restartRequired = false; @@ -736,6 +746,53 @@ private Scn calculateEndScn(OracleConnection connection, Scn startScn, Scn prevE } } + /** + * Calculates the deviated end scn based on the scn range and deviation. + * + * @param lowerboundsScn the mining range's lower bounds + * @param upperboundsScn the mining range's upper bounds + * @param deviation the time deviation + * @return an optional that contains the deviated scn or empty if the operation should be performed again + */ + private Optional calculateDeviatedEndScn(Scn lowerboundsScn, Scn upperboundsScn, Duration deviation) { + final Optional calculatedDeviatedEndScn = getDeviatedMaxScn(upperboundsScn, deviation); + if (calculatedDeviatedEndScn.isEmpty() || calculatedDeviatedEndScn.get().isNull()) { + // This happens only if the deviation calculation is outside the flashback/undo area or an exception was thrown. + // In this case we have no choice but to use the upper bounds as a fallback. + LOGGER.warn("Mining session end SCN deviation calculation is outside undo space, using upperbounds {}. If this continues, " + + "consider lowering the value of the '{}' configuration property.", upperboundsScn, + OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS.name()); + return Optional.of(upperboundsScn); + } + else if (calculatedDeviatedEndScn.get().compareTo(lowerboundsScn) <= 0) { + // This should also force the outer loop to recall this method again. + LOGGER.debug("Mining session end SCN deviation as {}, outside of mining range, recalculating.", calculatedDeviatedEndScn.get()); + return Optional.empty(); + } + else { + // Calculated SCN is after lower bounds and within flashback/undo area, safe to return. + return calculatedDeviatedEndScn; + } + } + + /** + * Uses the provided Upperbound SCN and deviation to calculate an SCN that happened in the past at a + * time based on Oracle's {@code TIMESTAMP_TO_SCN} and {@code SCN_TO_TIMESTAMP} functions. + * + * @param upperboundsScn the upper bound system change number, should not be {@code null} + * @param deviation the time deviation to be applied, should not be {@code null} + * @return the newly calculated Scn + */ + private Optional getDeviatedMaxScn(Scn upperboundsScn, Duration deviation) { + try { + return Optional.of(jdbcConnection.getScnAdjustedByTime(upperboundsScn, deviation)); + } + catch (SQLException e) { + LOGGER.warn("Failed to calculate deviated max SCN value from {}.", upperboundsScn); + return Optional.empty(); + } + } + /** * Checks and validates the database's supplemental logging configuration as well as the lengths of the * table and column names that are part of the database schema. diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 809c55952..c5355a334 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -35,7 +35,10 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.management.JMException; @@ -5452,6 +5455,110 @@ public void shouldNotThrowConcurrentModificationExceptionWhenDispatchingSchemaCh } } + @Test + @FixFor("DBZ-6660") + @SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.LOGMINER) + public void shouldPauseAndWaitForDeviationCalculationIfBeforeMiningRange() throws Exception { + try { + TestHelper.dropTable(connection, "dbz6660"); + + connection.execute("CREATE TABLE dbz6660 (id number(9,0), data varchar2(50), primary key(id))"); + TestHelper.streamTable(connection, "dbz6660"); + + final Long deviationMs = 10000L; + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6660") + .with(OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS, deviationMs.toString()) + .build(); + + final LogInterceptor sourceLogging = new LogInterceptor(LogMinerStreamingChangeEventSource.class); + sourceLogging.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG); + + final LogInterceptor processorLogging = new LogInterceptor(AbstractLogMinerEventProcessor.class); + processorLogging.setLoggerLevel(AbstractLogMinerEventProcessor.class, Level.DEBUG); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // This should be printed at the start of the streaming session while the back-fill is prepared. + // Based on the time given, this could be printed several times. + Awaitility.await().atMost(Duration.ofSeconds(60)) + .until(() -> sourceLogging.containsMessage("outside of mining range, recalculating.")); + + // Assert that every lag log entry is at least 10s behind due to deviation. + try { + final Pattern pattern = Pattern.compile("Lag: ([0-9]+)"); + final AtomicInteger id = new AtomicInteger(1); + Awaitility.await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(60)).until(() -> { + // Provide some dummy captured data periodically + connection.execute("INSERT INTO dbz6660 values (" + id.getAndIncrement() + ", 'data')"); + final List entries = processorLogging.getLogEntriesThatContainsMessage("Processed in "); + for (String entry : entries) { + final Matcher matcher = pattern.matcher(entry); + if (matcher.matches()) { + assertThat(Long.valueOf(matcher.group(1))).isGreaterThan(deviationMs); + } + } + return false; + }); + } + catch (ConditionTimeoutException e) { + // ignored + } + + // Just concerned that every iteration has lag greater than deviation. + stopConnector(); + } + finally { + TestHelper.dropTable(connection, "dbz6660"); + } + } + + @Test + @FixFor("DBZ-6660") + @SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.LOGMINER) + public void shouldUseEndScnIfDeviationProducesScnOutsideOfUndoRetention() throws Exception { + try { + TestHelper.dropTable(connection, "dbz6660"); + + connection.execute("CREATE TABLE dbz6660 (id number(9,0), data varchar2(50), primary key(id))"); + TestHelper.streamTable(connection, "dbz6660"); + + // This is effectively 166666.667 minutes = 115.74 days + // No Oracle instance should have undo space this large :) + final Long deviationMs = 10000000000L; + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6660") + .with(OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS, deviationMs.toString()) + .build(); + + final LogInterceptor sourceLogging = new LogInterceptor(LogMinerStreamingChangeEventSource.class); + sourceLogging.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // This should be printed at the start of the streaming session while the back-fill is prepared. + // Based on the time given, this could be printed several times. + Awaitility.await().atMost(Duration.ofSeconds(60)) + .until(() -> sourceLogging.containsMessage("outside undo space, using upperbounds")); + + // Just concerned that every iteration has lag greater than deviation. + stopConnector(); + } + finally { + TestHelper.dropTable(connection, "dbz6660"); + } + } + private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException { try (OracleConnection admin = TestHelper.adminConnection(true)) { final Scn scn = admin.getCurrentScn(); diff --git a/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java b/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java index faaedb69a..009be2f3e 100644 --- a/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java +++ b/debezium-core/src/test/java/io/debezium/junit/logging/LogInterceptor.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import org.slf4j.LoggerFactory; @@ -74,6 +75,13 @@ public void setLoggerLevel(Class loggerClass, Level level) { logger.setLevel(level); } + public List getLogEntriesThatContainsMessage(String text) { + return events.stream() + .filter(e -> e.getFormattedMessage().toString().contains(text)) + .map(e -> e.getFormattedMessage().toString()) + .collect(Collectors.toList()); + } + public long countOccurrences(String text) { return events.stream().filter(e -> e.getMessage().toString().contains(text)).count(); }