From e025230d44ff0b656fb981110c056fa5ed770f95 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Fri, 9 Jun 2023 11:50:05 -0400 Subject: [PATCH] DBZ-6499 Support restarting JDBC connections Introduces a new configuration option, `log.mining.restart.connection`, which closes and re-opens the JDBC connection when an Oracle Log switch occurs or when the optionally configured log mining session max lifetime is reached. --- .../oracle/OracleConnectorConfig.java | 23 ++++- .../LogMinerStreamingChangeEventSource.java | 20 ++++- .../connector/oracle/OracleConnectorIT.java | 83 +++++++++++++++++++ .../modules/ROOT/pages/connectors/oracle.adoc | 7 ++ 4 files changed, 129 insertions(+), 4 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 3bc150e93..8e9518325 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -478,6 +478,17 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withDescription( "The maximum number of milliseconds that a LogMiner session lives for before being restarted. Defaults to 0 (indefinite until a log switch occurs)"); + public static final Field LOG_MINING_RESTART_CONNECTION = Field.create("log.mining.restart.connection") + .withDisplayName("Restarts Oracle database connection when reaching maximum session time or database log switch") + .withType(Type.BOOLEAN) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDefault(false) + .withDescription("Debezium opens a database connection and keeps that connection open throughout the entire streaming phase. " + + "In some situations, this can lead to excessive SGA memory usage. " + + "By setting this option to 'true' (the default is 'false'), the connector will close and re-open a database connection " + + "after every detected log switch or if the log.mining.session.max.ms has been reached."); + public static final Field LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE = Field.createInternal("log.mining.transaction.snapshot.boundary.mode") .withEnum(TransactionSnapshotBoundaryMode.class, TransactionSnapshotBoundaryMode.SKIP) .withWidth(Width.SHORT) @@ -581,7 +592,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, LOG_MINING_READ_ONLY, LOG_MINING_FLUSH_TABLE_NAME, - LOG_MINING_QUERY_FILTER_MODE) + LOG_MINING_QUERY_FILTER_MODE, + LOG_MINING_RESTART_CONNECTION) .create(); /** @@ -643,6 +655,7 @@ public static ConfigDef configDef() { private final Boolean logMiningReadOnly; private final String logMiningFlushTableName; private final LogMiningQueryFilterMode logMiningQueryFilterMode; + private final Boolean logMiningRestartConnection; public OracleConnectorConfig(Configuration config) { super( @@ -701,6 +714,7 @@ public OracleConnectorConfig(Configuration config) { this.logMiningReadOnly = config.getBoolean(LOG_MINING_READ_ONLY); this.logMiningFlushTableName = config.getString(LOG_MINING_FLUSH_TABLE_NAME); this.logMiningQueryFilterMode = LogMiningQueryFilterMode.parse(config.getString(LOG_MINING_QUERY_FILTER_MODE)); + this.logMiningRestartConnection = config.getBoolean(LOG_MINING_RESTART_CONNECTION); } private static String toUpperCase(String property) { @@ -1647,6 +1661,13 @@ public LogMiningQueryFilterMode getLogMiningQueryFilterMode() { return logMiningQueryFilterMode; } + /** + * @return whether the connector should restart the JDBC connection after log switches or maximum session windows. + */ + public boolean isLogMiningRestartConnection() { + return logMiningRestartConnection; + } + @Override public String getConnectorName() { return Module.name(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index ac4615f10..9598dbde5 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -138,8 +138,8 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, return; } try { - // We explicitly expect auto-commit to be disabled - jdbcConnection.setAutoCommit(false); + + prepareConnection(jdbcConnection, false); this.effectiveOffset = offsetContext; startScn = offsetContext.getScn(); @@ -160,7 +160,6 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, "Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot."); } - setNlsSessionParameters(jdbcConnection); checkDatabaseAndTableState(jdbcConnection, connectorConfig.getPdbName(), schema); logOnlineRedoLogSizes(connectorConfig); @@ -214,6 +213,9 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, // With one mining session, it grows and maybe there is another way to flush PGA. // At this point we use a new mining session endMiningSession(jdbcConnection, offsetContext); + if (connectorConfig.isLogMiningRestartConnection()) { + prepareConnection(jdbcConnection, true); + } initializeRedoLogsForMining(jdbcConnection, true, startScn); // log switch or restart required, re-create a new stop watch @@ -248,6 +250,18 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition, } } + private void prepareConnection(OracleConnection connection, boolean closeAndReconnect) throws SQLException { + if (closeAndReconnect) { + // Close and reconnect + LOGGER.debug("Log switch or maximum session threshold detected, restarting Oracle JDBC connection."); + connection.close(); + } + + // We explicitly expect auto-commit to be disabled + connection.setAutoCommit(false); + setNlsSessionParameters(connection); + } + private void logOnlineRedoLogSizes(OracleConnectorConfig config) throws SQLException { jdbcConnection.query("SELECT GROUP#, BYTES FROM V$LOG ORDER BY 1", rs -> { LOGGER.info("Redo Log Group Sizes:"); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index c4af58274..bc35d24ff 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -5228,6 +5229,88 @@ public void shouldGetTableMetadataOnlyForCapturedTables() throws Exception { assertThat(logInterceptor.containsMessage("1 table(s) will be scanned")).isTrue(); } + @Test + @FixFor("DBZ-6499") + @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner") + public void shouldRestartOracleJdbcConnectionAtMaxSessionThreshold() throws Exception { + // In order to guarantee there are no log switches during this test, this test will preemptively + // perform a transaction log switch before initiating the test. + TestHelper.forceLogfileSwitch(); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") + .with(OracleConnectorConfig.LOG_MINING_SESSION_MAX_MS, "30000") + .with(OracleConnectorConfig.LOG_MINING_RESTART_CONNECTION, "true") + .build(); + + LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class); + logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // Session should trigger a restart after 30 seconds of streaming + // After the restart has been triggered, it is safe to stop the connector + Awaitility.await() + .atMost(Duration.ofMinutes(2)) + .until(() -> logInterceptor.containsMessage("restarting Oracle JDBC connection")); + + // Give the connector a few seconds to restart the mining loop before stopping + Thread.sleep(5000); + + stopConnector(); + } + + @Test + @FixFor("DBZ-6499") + @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Applies only to LogMiner") + public void shouldRestartOracleJdbcConnectionUponLogSwitch() throws Exception { + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER") + .with(OracleConnectorConfig.LOG_MINING_RESTART_CONNECTION, "true") + .build(); + + LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class); + logInterceptor.setLoggerLevel(LogMinerStreamingChangeEventSource.class, Level.DEBUG); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // Wait cycle + // 1. Waits for one mining loop, after which a Log Switch is forced. + // 2. Once Log Switch is forced, wait for connector to detect and write log entry. + // 3. Once connector has restarted the JDBC connection, wait loop exits. + final AtomicBoolean completedOneMiningLoop = new AtomicBoolean(); + final AtomicBoolean logSwitchForced = new AtomicBoolean(); + Awaitility.await() + .atMost(Duration.ofMinutes(2)) + .until(() -> { + if (!completedOneMiningLoop.get()) { + if (!logInterceptor.containsMessage("Oracle Session UGA")) { + return false; + } + else { + completedOneMiningLoop.set(true); + } + } + if (!logSwitchForced.get()) { + logSwitchForced.set(true); + TestHelper.forceLogfileSwitch(); + return false; + } + return logInterceptor.containsMessage("restarting Oracle JDBC connection"); + }); + + // Give the connector a few seconds to restart the mining loop before stopping + Thread.sleep(5000); + + stopConnector(); + } + private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException { try (OracleConnection admin = TestHelper.adminConnection(true)) { final Scn scn = admin.getCurrentScn(); diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index 7cefa6174..6107c2d4a 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -3203,6 +3203,13 @@ For low volume systems, a LogMiner session may consume too much PGA memory when The default behavior is to only use a new LogMiner session when a log switch is detected. By setting this value to something greater than `0`, this specifies the maximum number of milliseconds a LogMiner session can be active before it gets stopped and started to deallocate and reallocate PGA memory. +|[[oracle-property-log-mining-restart-connection]]<> +|`false` +|Specifies whether the JDBC connection will be closed and re-opened on log switches or when mining session has reached maximum lifetime threshold. + + + +By default, the JDBC connection is not closed across log switches or maximum session lifetimes. + +This should be enabled if you experience excessive Oracle SGA growth with LogMiner. + |[[oracle-property-log-mining-batch-size-min]]<> |`1000` |The minimum SCN interval size that this connector attempts to read from redo/archive logs. Active batch size is also increased/decreased by this amount for tuning connector throughput when needed.