diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/CommitLogWriterFlushStrategy.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/CommitLogWriterFlushStrategy.java index af2c6b5a7..ef2052e49 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/CommitLogWriterFlushStrategy.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/CommitLogWriterFlushStrategy.java @@ -14,7 +14,6 @@ import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.Scn; -import io.debezium.jdbc.JdbcConfiguration; import io.debezium.relational.TableId; import io.debezium.util.Strings; @@ -37,7 +36,6 @@ public class CommitLogWriterFlushStrategy implements LogWriterFlushStrategy { private final TableId flushTableId; private final String databasePdbName; private final OracleConnection connection; - private final boolean closeConnectionOnClose; /** * Creates a transaction-commit Oracle LogWriter (LGWR) process flush strategy. @@ -53,40 +51,11 @@ public CommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, Oracl this.flushTableName = flushTableId.toDoubleQuotedString(); this.databasePdbName = connectorConfig.getPdbName(); this.connection = connection; - this.closeConnectionOnClose = false; - createFlushTableIfNotExists(); - } - - /** - * Creates a transaction-commit Oracle LogWriter (LGWR) process flush strategy. - * - * This will create a new database connection based on the supplied JDBC configuration and the - * connection will automatically be closed when the strategy is closed. - * - * @param connectorConfig the connector configuration, must not be {@code null} - * @param jdbcConfig the jdbc configuration - * @throws SQLException if there was a database problem - */ - public CommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig) throws SQLException { - this.flushTableId = TableId.parse(connectorConfig.getLogMiningFlushTableName()); - this.flushTableName = flushTableId.toDoubleQuotedString(); - this.databasePdbName = connectorConfig.getPdbName(); - this.connection = new OracleConnection(jdbcConfig); - this.connection.setAutoCommit(false); - this.closeConnectionOnClose = true; createFlushTableIfNotExists(); } @Override public void close() { - if (closeConnectionOnClose) { - try { - connection.close(); - } - catch (SQLException e) { - throw new DebeziumException("Failed to close connection to host '" + getHost() + "'", e); - } - } } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/RacCommitLogWriterFlushStrategy.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/RacCommitLogWriterFlushStrategy.java index 64ddfe8ff..6aca14617 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/RacCommitLogWriterFlushStrategy.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/logwriter/RacCommitLogWriterFlushStrategy.java @@ -8,8 +8,8 @@ import java.sql.SQLException; import java.time.Duration; import java.time.Instant; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -18,6 +18,7 @@ import io.debezium.DebeziumException; import io.debezium.config.Configuration; +import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; @@ -41,7 +42,7 @@ public class RacCommitLogWriterFlushStrategy implements LogWriterFlushStrategy { private static final Logger LOGGER = LoggerFactory.getLogger(RacCommitLogWriterFlushStrategy.class); - private final Map flushStrategies = new HashMap<>(); + private final List racNodes = new ArrayList<>(); private final LogMinerStreamingChangeEventSourceMetrics streamingMetrics; private final JdbcConfiguration jdbcConfiguration; private final OracleConnectorConfig connectorConfig; @@ -60,13 +61,15 @@ public RacCommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, Jd this.streamingMetrics = streamingMetrics; this.connectorConfig = connectorConfig; this.hosts = connectorConfig.getRacNodes().stream().map(String::toUpperCase).collect(Collectors.toSet()); - recreateRacNodeFlushStrategies(); + createRacNodesList(); } @Override public void close() { - closeRacNodeFlushStrategies(); - flushStrategies.clear(); + for (RacNode node : racNodes) { + node.close(); + } + racNodes.clear(); } @Override @@ -80,25 +83,41 @@ public void flush(Scn currentScn) throws InterruptedException { // For this configuration, all LGWR processes across all instances must be flushed. // Queries cannot be used such as gv_instance as not all nodes could be load balanced. Instant startTime = Instant.now(); - if (flushStrategies.isEmpty()) { + if (racNodes.isEmpty()) { + // In this case it means all nodes have disappeared and main connection will likely throw + // a database error too, so it's safe to throw here. throw new DebeziumException("No RAC node addresses supplied or currently connected"); } - boolean recreateConnections = false; - for (Map.Entry entry : flushStrategies.entrySet()) { - final CommitLogWriterFlushStrategy strategy = entry.getValue(); - try { - // Flush the node's commit log writer - strategy.flush(currentScn); - } - catch (Exception e) { - LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}'", strategy.getHost(), e); - recreateConnections = true; + // Before flushing, verify that nodes are connected + for (RacNode node : racNodes) { + if (!node.isConnected()) { + // Attempt to reconnect + node.reconnect(); } } - if (recreateConnections) { - recreateRacNodeFlushStrategies(); + // Flush to nodes + boolean allNodesFlushed = true; + for (RacNode node : racNodes) { + if (node.isConnected()) { + final LogWriterFlushStrategy strategy = node.getFlushStrategy(); + try { + // Flush the node's commit log writer + strategy.flush(currentScn); + } + catch (Exception e) { + LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}': {}", node.getHostName(), e.getMessage()); + node.close(); + allNodesFlushed = false; + } + } + else { + allNodesFlushed = false; + } + } + + if (!allNodesFlushed) { LOGGER.warn("Not all LGWR buffers were flushed, waiting 3 seconds for Oracle to flush automatically."); Metronome metronome = Metronome.sleeper(Duration.ofSeconds(3), Clock.SYSTEM); try { @@ -113,18 +132,13 @@ public void flush(Scn currentScn) throws InterruptedException { LOGGER.trace("LGWR flush took {} to complete.", Duration.between(startTime, Instant.now())); } - private void recreateRacNodeFlushStrategies() { - // Close existing flush strategies to RAC nodes - closeRacNodeFlushStrategies(); - - // Clear map - flushStrategies.clear(); - - // Create strategies by host + private void createRacNodesList() { for (String hostName : hosts) { try { - final String[] parts = parseHostName(hostName); - flushStrategies.put(hostName, createHostFlushStrategy(parts[0], Integer.parseInt(parts[1]), parts[2])); + final RacNode node = new RacNode(hostName); + node.connect(); + + racNodes.add(node); } catch (SQLException e) { throw new DebeziumException("Cannot connect to RAC node '" + hostName + "'", e); @@ -132,54 +146,143 @@ private void recreateRacNodeFlushStrategies() { } } - private String[] parseHostName(String hostName) { - final String[] parts = new String[3]; - final String[] colonParts = hostName.split(":"); - parts[0] = colonParts[0]; - if (colonParts[1].contains("/")) { - // SID provided - final int slashIndex = colonParts[1].indexOf('/'); - parts[1] = colonParts[1].substring(0, slashIndex); - parts[2] = colonParts[1].substring(slashIndex + 1); - return parts; - } - else { - // No SID provided - parts[1] = colonParts[1]; - parts[2] = null; + private class RacNode { + private final String hostName; + + private OracleConnection connection; + private LogWriterFlushStrategy flushStrategy; + + RacNode(String hostName) { + this.hostName = hostName; } - return parts; - } - - private CommitLogWriterFlushStrategy createHostFlushStrategy(String hostName, Integer port, String sid) throws SQLException { - Configuration.Builder jdbcConfigBuilder = jdbcConfiguration.edit() - .with(JdbcConfiguration.HOSTNAME, hostName) - .with(JdbcConfiguration.PORT, port); - - if (!Strings.isNullOrBlank(sid)) { - jdbcConfigBuilder = jdbcConfigBuilder.with(JdbcConfiguration.DATABASE, sid); + /** + * Get the hostname for the RAC node. + * @return the hostname + */ + public String getHostName() { + return this.hostName; } - final JdbcConfiguration jdbcHostConfig = JdbcConfiguration.adapt(jdbcConfigBuilder.build()); + /** + * Get the strategy used by the node to flush + * + * @return the flush strategy + */ + public LogWriterFlushStrategy getFlushStrategy() { + return flushStrategy; + } - LOGGER.debug("Creating flush connection to RAC node '{}'", hostName); - return new CommitLogWriterFlushStrategy(connectorConfig, jdbcHostConfig); - } - - /** - * Closes the RAC node flush strategies. - */ - private void closeRacNodeFlushStrategies() { - for (CommitLogWriterFlushStrategy strategy : flushStrategies.values()) { + /** + * Return whether the RAC node is current connected or not. + */ + public boolean isConnected() { try { - // close the strategy's connection - strategy.close(); + return connection != null && connection.isConnected(); } - catch (Exception e) { - LOGGER.warn("Failed to close RAC connection to node '{}'", strategy.getHost(), e); - streamingMetrics.incrementWarningCount(); + catch (SQLException e) { + return false; } } + + /** + * Connects to the RAC node + * + * @throws SQLException if a connection failure occurred + */ + public void connect() throws SQLException { + final String[] parts = parseHostName(hostName); + + final String databaseHostName = parts[0]; + final int port = Integer.parseInt(parts[1]); + final String sid = parts[2]; + + Configuration.Builder jdbcConfigBuilder = jdbcConfiguration.edit() + .with(JdbcConfiguration.HOSTNAME, databaseHostName) + .with(JdbcConfiguration.PORT, port); + + if (!Strings.isNullOrBlank(sid)) { + jdbcConfigBuilder = jdbcConfigBuilder.with(JdbcConfiguration.DATABASE, sid); + } + + final JdbcConfiguration jdbcHostConfig = JdbcConfiguration.adapt(jdbcConfigBuilder.build()); + + this.connection = new OracleConnection(jdbcHostConfig); + this.connection.setAutoCommit(false); + + LOGGER.info("Created flush connection to RAC node '{}'", hostName); + this.flushStrategy = new CommitLogWriterFlushStrategy(connectorConfig, connection); + } + + /** + * Reconnect to the RAC node + */ + void reconnect() { + try { + if (connection == null) { + // Close was called + connect(); + } + else { + // Close wasn't called but the connection lost. + connection.reconnect(); + + // Recreate the flush strategy if needed + if (this.flushStrategy == null) { + this.flushStrategy = new CommitLogWriterFlushStrategy(connectorConfig, connection); + } + } + LOGGER.info("Successfully reconnected to Oracle RAC node '{}'", hostName); + } + catch (SQLException e) { + LOGGER.warn("Failed to reconnect to RAC node '{}': {}", hostName, e.getMessage()); + close(); + } + } + + /** + * Closes the connection with the RAC node. + */ + public void close() { + if (flushStrategy != null) { + final String hostName = flushStrategy.getHost(); + try { + flushStrategy.close(); + } + catch (Exception e) { + LOGGER.warn("Failed to close RAC flush strategy to node '{}'", hostName, e); + streamingMetrics.incrementWarningCount(); + } + } + if (connection != null) { + try { + connection.close(); + } + catch (Exception e) { + LOGGER.warn("Failed to close RAC connection to node '{}'", hostName, e); + } + } + flushStrategy = null; + } + + private String[] parseHostName(String hostName) { + final String[] parts = new String[3]; + final String[] colonParts = hostName.split(":"); + parts[0] = colonParts[0]; + if (colonParts[1].contains("/")) { + // SID provided + final int slashIndex = colonParts[1].indexOf('/'); + parts[1] = colonParts[1].substring(0, slashIndex); + parts[2] = colonParts[1].substring(slashIndex + 1); + return parts; + } + else { + // No SID provided + parts[1] = colonParts[1]; + parts[2] = null; + } + + return parts; + } } }