DBZ-4963 Use Optional & Stopwatch, disable lengthy test by default

This commit is contained in:
Chris Cranford 2022-04-20 11:52:38 -04:00 committed by Jiri Pechanec
parent c804509673
commit 5f9b2fa822
3 changed files with 17 additions and 7 deletions

View File

@ -9,6 +9,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -1403,8 +1404,8 @@ public Duration getLogMiningMaxDelay() {
/**
* @return the maximum duration for a LogMiner session
*/
public Duration getLogMiningMaximumSession() {
return logMiningMaximumSession;
public Optional<Duration> getLogMiningMaximumSession() {
return logMiningMaximumSession.toMillis() == 0L ? Optional.empty() : Optional.of(logMiningMaximumSession);
}
@Override

View File

@ -51,6 +51,7 @@
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
/**
* A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility.
@ -151,8 +152,8 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
}
initializeRedoLogsForMining(jdbcConnection, false, startScn);
long startTimeMillis = System.currentTimeMillis();
Stopwatch sw = Stopwatch.accumulating().start();
while (context.isRunning()) {
// Calculate time difference before each mining session to detect time zone offset changes (e.g. DST) on database server
streamingMetrics.calculateTimeDifference(getDatabaseSystemTime(jdbcConnection));
@ -175,12 +176,16 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
flushStrategy.flush(jdbcConnection.getCurrentScn());
boolean restartRequired = false;
if (!connectorConfig.getLogMiningMaximumSession().isZero()) {
long timeSinceStart = System.currentTimeMillis() - startTimeMillis;
if (timeSinceStart >= connectorConfig.getLogMiningMaximumSession().toMillis()) {
if (connectorConfig.getLogMiningMaximumSession().isPresent()) {
final Duration totalDuration = sw.stop().durations().statistics().getTotal();
if (totalDuration.toMillis() >= connectorConfig.getLogMiningMaximumSession().get().toMillis()) {
LOGGER.info("LogMiner session has exceeded maximum session time of '{}', forcing restart.", connectorConfig.getLogMiningMaximumSession());
restartRequired = true;
}
else {
// resume the existing stop watch, we haven't met the criteria yet
sw.start();
}
}
if (restartRequired || hasLogSwitchOccurred()) {
@ -189,7 +194,9 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
// At this point we use a new mining session
endMiningSession(jdbcConnection, offsetContext);
initializeRedoLogsForMining(jdbcConnection, true, startScn);
startTimeMillis = System.currentTimeMillis();
// log switch or restart required, re-create a new stop watch
sw = Stopwatch.accumulating().start();
}
if (context.isRunning()) {

View File

@ -49,6 +49,7 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
@ -3520,6 +3521,7 @@ public void shouldRestartLogMiningSessionAfterMaxSessionElapses() throws Excepti
@Test
@FixFor("DBZ-4963")
@Ignore("Waits 60 seconds by default, so disabled by default")
public void shouldNotRestartLogMiningSessionWithMaxSessionZero() throws Exception {
TestHelper.dropTable(connection, "dbz4963");
try {