DBZ-6499 Support restarting JDBC connections

Introduces a new configuration option, `log.mining.restart.connection`,
which closes and re-opens the JDBC connection when an Oracle Log switch
occurs or when the optionally configured log mining session max lifetime
is reached.
This commit is contained in:
Chris Cranford 2023-06-09 11:50:05 -04:00 committed by Jiri Pechanec
parent 08ed847095
commit e025230d44
4 changed files with 129 additions and 4 deletions

View File

@ -478,6 +478,17 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withDescription( .withDescription(
"The maximum number of milliseconds that a LogMiner session lives for before being restarted. Defaults to 0 (indefinite until a log switch occurs)"); "The maximum number of milliseconds that a LogMiner session lives for before being restarted. Defaults to 0 (indefinite until a log switch occurs)");
public static final Field LOG_MINING_RESTART_CONNECTION = Field.create("log.mining.restart.connection")
.withDisplayName("Restarts Oracle database connection when reaching maximum session time or database log switch")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(false)
.withDescription("Debezium opens a database connection and keeps that connection open throughout the entire streaming phase. " +
"In some situations, this can lead to excessive SGA memory usage. " +
"By setting this option to 'true' (the default is 'false'), the connector will close and re-open a database connection " +
"after every detected log switch or if the log.mining.session.max.ms has been reached.");
public static final Field LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE = Field.createInternal("log.mining.transaction.snapshot.boundary.mode") public static final Field LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE = Field.createInternal("log.mining.transaction.snapshot.boundary.mode")
.withEnum(TransactionSnapshotBoundaryMode.class, TransactionSnapshotBoundaryMode.SKIP) .withEnum(TransactionSnapshotBoundaryMode.class, TransactionSnapshotBoundaryMode.SKIP)
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
@ -581,7 +592,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE,
LOG_MINING_READ_ONLY, LOG_MINING_READ_ONLY,
LOG_MINING_FLUSH_TABLE_NAME, LOG_MINING_FLUSH_TABLE_NAME,
LOG_MINING_QUERY_FILTER_MODE) LOG_MINING_QUERY_FILTER_MODE,
LOG_MINING_RESTART_CONNECTION)
.create(); .create();
/** /**
@ -643,6 +655,7 @@ public static ConfigDef configDef() {
private final Boolean logMiningReadOnly; private final Boolean logMiningReadOnly;
private final String logMiningFlushTableName; private final String logMiningFlushTableName;
private final LogMiningQueryFilterMode logMiningQueryFilterMode; private final LogMiningQueryFilterMode logMiningQueryFilterMode;
private final Boolean logMiningRestartConnection;
public OracleConnectorConfig(Configuration config) { public OracleConnectorConfig(Configuration config) {
super( super(
@ -701,6 +714,7 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningReadOnly = config.getBoolean(LOG_MINING_READ_ONLY); this.logMiningReadOnly = config.getBoolean(LOG_MINING_READ_ONLY);
this.logMiningFlushTableName = config.getString(LOG_MINING_FLUSH_TABLE_NAME); this.logMiningFlushTableName = config.getString(LOG_MINING_FLUSH_TABLE_NAME);
this.logMiningQueryFilterMode = LogMiningQueryFilterMode.parse(config.getString(LOG_MINING_QUERY_FILTER_MODE)); this.logMiningQueryFilterMode = LogMiningQueryFilterMode.parse(config.getString(LOG_MINING_QUERY_FILTER_MODE));
this.logMiningRestartConnection = config.getBoolean(LOG_MINING_RESTART_CONNECTION);
} }
private static String toUpperCase(String property) { private static String toUpperCase(String property) {
@ -1647,6 +1661,13 @@ public LogMiningQueryFilterMode getLogMiningQueryFilterMode() {
return logMiningQueryFilterMode; return logMiningQueryFilterMode;
} }
/**
* @return whether the connector should restart the JDBC connection after log switches or maximum session windows.
*/
public boolean isLogMiningRestartConnection() {
return logMiningRestartConnection;
}
@Override @Override
public String getConnectorName() { public String getConnectorName() {
return Module.name(); return Module.name();

View File

@ -138,8 +138,8 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
return; return;
} }
try { try {
// We explicitly expect auto-commit to be disabled
jdbcConnection.setAutoCommit(false); prepareConnection(jdbcConnection, false);
this.effectiveOffset = offsetContext; this.effectiveOffset = offsetContext;
startScn = offsetContext.getScn(); startScn = offsetContext.getScn();
@ -160,7 +160,6 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot."); "Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
} }
setNlsSessionParameters(jdbcConnection);
checkDatabaseAndTableState(jdbcConnection, connectorConfig.getPdbName(), schema); checkDatabaseAndTableState(jdbcConnection, connectorConfig.getPdbName(), schema);
logOnlineRedoLogSizes(connectorConfig); logOnlineRedoLogSizes(connectorConfig);
@ -214,6 +213,9 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
// With one mining session, it grows and maybe there is another way to flush PGA. // With one mining session, it grows and maybe there is another way to flush PGA.
// At this point we use a new mining session // At this point we use a new mining session
endMiningSession(jdbcConnection, offsetContext); endMiningSession(jdbcConnection, offsetContext);
if (connectorConfig.isLogMiningRestartConnection()) {
prepareConnection(jdbcConnection, true);
}
initializeRedoLogsForMining(jdbcConnection, true, startScn); initializeRedoLogsForMining(jdbcConnection, true, startScn);
// log switch or restart required, re-create a new stop watch // log switch or restart required, re-create a new stop watch
@ -248,6 +250,18 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
} }
} }
private void prepareConnection(OracleConnection connection, boolean closeAndReconnect) throws SQLException {
if (closeAndReconnect) {
// Close and reconnect
LOGGER.debug("Log switch or maximum session threshold detected, restarting Oracle JDBC connection.");
connection.close();
}
// We explicitly expect auto-commit to be disabled
connection.setAutoCommit(false);
setNlsSessionParameters(connection);
}
private void logOnlineRedoLogSizes(OracleConnectorConfig config) throws SQLException { private void logOnlineRedoLogSizes(OracleConnectorConfig config) throws SQLException {
jdbcConnection.query("SELECT GROUP#, BYTES FROM V$LOG ORDER BY 1", rs -> { jdbcConnection.query("SELECT GROUP#, BYTES FROM V$LOG ORDER BY 1", rs -> {
LOGGER.info("Redo Log Group Sizes:"); LOGGER.info("Redo Log Group Sizes:");

View File

@ -34,6 +34,7 @@
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -5228,6 +5229,88 @@ public void shouldGetTableMetadataOnlyForCapturedTables() throws Exception {
assertThat(logInterceptor.containsMessage("1 table(s) will be scanned")).isTrue(); assertThat(logInterceptor.containsMessage("1 table(s) will be scanned")).isTrue();
} }
@Test
@FixFor("DBZ-6499")
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
public void shouldRestartOracleJdbcConnectionAtMaxSessionThreshold() throws Exception {
// In order to guarantee there are no log switches during this test, this test will preemptively
// perform a transaction log switch before initiating the test.
TestHelper.forceLogfileSwitch();
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")
.with(OracleConnectorConfig.LOG_MINING_SESSION_MAX_MS, "30000")
.with(OracleConnectorConfig.LOG_MINING_RESTART_CONNECTION, "true")
.build();
LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Session should trigger a restart after 30 seconds of streaming
// After the restart has been triggered, it is safe to stop the connector
Awaitility.await()
.atMost(Duration.ofMinutes(2))
.until(() -> logInterceptor.containsMessage("restarting Oracle JDBC connection"));
// Give the connector a few seconds to restart the mining loop before stopping
Thread.sleep(5000);
stopConnector();
}
@Test
@FixFor("DBZ-6499")
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner")
public void shouldRestartOracleJdbcConnectionUponLogSwitch() throws Exception {
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")
.with(OracleConnectorConfig.LOG_MINING_RESTART_CONNECTION, "true")
.build();
LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG);
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Wait cycle
// 1. Waits for one mining loop, after which a Log Switch is forced.
// 2. Once Log Switch is forced, wait for connector to detect and write log entry.
// 3. Once connector has restarted the JDBC connection, wait loop exits.
final AtomicBoolean completedOneMiningLoop = new AtomicBoolean();
final AtomicBoolean logSwitchForced = new AtomicBoolean();
Awaitility.await()
.atMost(Duration.ofMinutes(2))
.until(() -> {
if (!completedOneMiningLoop.get()) {
if (!logInterceptor.containsMessage("Oracle Session UGA")) {
return false;
}
else {
completedOneMiningLoop.set(true);
}
}
if (!logSwitchForced.get()) {
logSwitchForced.set(true);
TestHelper.forceLogfileSwitch();
return false;
}
return logInterceptor.containsMessage("restarting Oracle JDBC connection");
});
// Give the connector a few seconds to restart the mining loop before stopping
Thread.sleep(5000);
stopConnector();
}
private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException { private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
try (OracleConnection admin = TestHelper.adminConnection(true)) { try (OracleConnection admin = TestHelper.adminConnection(true)) {
final Scn scn = admin.getCurrentScn(); final Scn scn = admin.getCurrentScn();

View File

@ -3203,6 +3203,13 @@ For low volume systems, a LogMiner session may consume too much PGA memory when
The default behavior is to only use a new LogMiner session when a log switch is detected. The default behavior is to only use a new LogMiner session when a log switch is detected.
By setting this value to something greater than `0`, this specifies the maximum number of milliseconds a LogMiner session can be active before it gets stopped and started to deallocate and reallocate PGA memory. By setting this value to something greater than `0`, this specifies the maximum number of milliseconds a LogMiner session can be active before it gets stopped and started to deallocate and reallocate PGA memory.
|[[oracle-property-log-mining-restart-connection]]<<oracle-property-log-mining-restart-connection, `+log.mining.restart.connection+`>>
|`false`
|Specifies whether the JDBC connection will be closed and re-opened on log switches or when mining session has reached maximum lifetime threshold. +
+
By default, the JDBC connection is not closed across log switches or maximum session lifetimes. +
This should be enabled if you experience excessive Oracle SGA growth with LogMiner.
|[[oracle-property-log-mining-batch-size-min]]<<oracle-property-log-mining-batch-size-min, `+log.mining.batch.size.min+`>> |[[oracle-property-log-mining-batch-size-min]]<<oracle-property-log-mining-batch-size-min, `+log.mining.batch.size.min+`>>
|`1000` |`1000`
|The minimum SCN interval size that this connector attempts to read from redo/archive logs. Active batch size is also increased/decreased by this amount for tuning connector throughput when needed. |The minimum SCN interval size that this connector attempts to read from redo/archive logs. Active batch size is also increased/decreased by this amount for tuning connector throughput when needed.