DBZ-8149 Rework RAC flush connections to be elastic

This commit is contained in:
Chris Cranford 2024-08-14 10:45:17 -04:00 committed by Chris Cranford
parent 63c467f1d8
commit c119016593
2 changed files with 172 additions and 100 deletions

View File

@ -14,7 +14,6 @@
import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.util.Strings; import io.debezium.util.Strings;
@ -37,7 +36,6 @@ public class CommitLogWriterFlushStrategy implements LogWriterFlushStrategy {
private final TableId flushTableId; private final TableId flushTableId;
private final String databasePdbName; private final String databasePdbName;
private final OracleConnection connection; private final OracleConnection connection;
private final boolean closeConnectionOnClose;
/** /**
* Creates a transaction-commit Oracle LogWriter (LGWR) process flush strategy. * Creates a transaction-commit Oracle LogWriter (LGWR) process flush strategy.
@ -53,40 +51,11 @@ public CommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, Oracl
this.flushTableName = flushTableId.toDoubleQuotedString(); this.flushTableName = flushTableId.toDoubleQuotedString();
this.databasePdbName = connectorConfig.getPdbName(); this.databasePdbName = connectorConfig.getPdbName();
this.connection = connection; this.connection = connection;
this.closeConnectionOnClose = false;
createFlushTableIfNotExists();
}
/**
* Creates a transaction-commit Oracle LogWriter (LGWR) process flush strategy.
*
* This will create a new database connection based on the supplied JDBC configuration and the
* connection will automatically be closed when the strategy is closed.
*
* @param connectorConfig the connector configuration, must not be {@code null}
* @param jdbcConfig the jdbc configuration
* @throws SQLException if there was a database problem
*/
public CommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig) throws SQLException {
this.flushTableId = TableId.parse(connectorConfig.getLogMiningFlushTableName());
this.flushTableName = flushTableId.toDoubleQuotedString();
this.databasePdbName = connectorConfig.getPdbName();
this.connection = new OracleConnection(jdbcConfig);
this.connection.setAutoCommit(false);
this.closeConnectionOnClose = true;
createFlushTableIfNotExists(); createFlushTableIfNotExists();
} }
@Override @Override
public void close() { public void close() {
if (closeConnectionOnClose) {
try {
connection.close();
}
catch (SQLException e) {
throw new DebeziumException("Failed to close connection to host '" + getHost() + "'", e);
}
}
} }
@Override @Override

View File

@ -8,8 +8,8 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.HashMap; import java.util.ArrayList;
import java.util.Map; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -18,6 +18,7 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
@ -41,7 +42,7 @@ public class RacCommitLogWriterFlushStrategy implements LogWriterFlushStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(RacCommitLogWriterFlushStrategy.class); private static final Logger LOGGER = LoggerFactory.getLogger(RacCommitLogWriterFlushStrategy.class);
private final Map<String, CommitLogWriterFlushStrategy> flushStrategies = new HashMap<>(); private final List<RacNode> racNodes = new ArrayList<>();
private final LogMinerStreamingChangeEventSourceMetrics streamingMetrics; private final LogMinerStreamingChangeEventSourceMetrics streamingMetrics;
private final JdbcConfiguration jdbcConfiguration; private final JdbcConfiguration jdbcConfiguration;
private final OracleConnectorConfig connectorConfig; private final OracleConnectorConfig connectorConfig;
@ -60,13 +61,15 @@ public RacCommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, Jd
this.streamingMetrics = streamingMetrics; this.streamingMetrics = streamingMetrics;
this.connectorConfig = connectorConfig; this.connectorConfig = connectorConfig;
this.hosts = connectorConfig.getRacNodes().stream().map(String::toUpperCase).collect(Collectors.toSet()); this.hosts = connectorConfig.getRacNodes().stream().map(String::toUpperCase).collect(Collectors.toSet());
recreateRacNodeFlushStrategies(); createRacNodesList();
} }
@Override @Override
public void close() { public void close() {
closeRacNodeFlushStrategies(); for (RacNode node : racNodes) {
flushStrategies.clear(); node.close();
}
racNodes.clear();
} }
@Override @Override
@ -80,25 +83,41 @@ public void flush(Scn currentScn) throws InterruptedException {
// For this configuration, all LGWR processes across all instances must be flushed. // For this configuration, all LGWR processes across all instances must be flushed.
// Queries cannot be used such as gv_instance as not all nodes could be load balanced. // Queries cannot be used such as gv_instance as not all nodes could be load balanced.
Instant startTime = Instant.now(); Instant startTime = Instant.now();
if (flushStrategies.isEmpty()) { if (racNodes.isEmpty()) {
// In this case it means all nodes have disappeared and main connection will likely throw
// a database error too, so it's safe to throw here.
throw new DebeziumException("No RAC node addresses supplied or currently connected"); throw new DebeziumException("No RAC node addresses supplied or currently connected");
} }
boolean recreateConnections = false; // Before flushing, verify that nodes are connected
for (Map.Entry<String, CommitLogWriterFlushStrategy> entry : flushStrategies.entrySet()) { for (RacNode node : racNodes) {
final CommitLogWriterFlushStrategy strategy = entry.getValue(); if (!node.isConnected()) {
try { // Attempt to reconnect
// Flush the node's commit log writer node.reconnect();
strategy.flush(currentScn);
}
catch (Exception e) {
LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}'", strategy.getHost(), e);
recreateConnections = true;
} }
} }
if (recreateConnections) { // Flush to nodes
recreateRacNodeFlushStrategies(); boolean allNodesFlushed = true;
for (RacNode node : racNodes) {
if (node.isConnected()) {
final LogWriterFlushStrategy strategy = node.getFlushStrategy();
try {
// Flush the node's commit log writer
strategy.flush(currentScn);
}
catch (Exception e) {
LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}': {}", node.getHostName(), e.getMessage());
node.close();
allNodesFlushed = false;
}
}
else {
allNodesFlushed = false;
}
}
if (!allNodesFlushed) {
LOGGER.warn("Not all LGWR buffers were flushed, waiting 3 seconds for Oracle to flush automatically."); LOGGER.warn("Not all LGWR buffers were flushed, waiting 3 seconds for Oracle to flush automatically.");
Metronome metronome = Metronome.sleeper(Duration.ofSeconds(3), Clock.SYSTEM); Metronome metronome = Metronome.sleeper(Duration.ofSeconds(3), Clock.SYSTEM);
try { try {
@ -113,18 +132,13 @@ public void flush(Scn currentScn) throws InterruptedException {
LOGGER.trace("LGWR flush took {} to complete.", Duration.between(startTime, Instant.now())); LOGGER.trace("LGWR flush took {} to complete.", Duration.between(startTime, Instant.now()));
} }
private void recreateRacNodeFlushStrategies() { private void createRacNodesList() {
// Close existing flush strategies to RAC nodes
closeRacNodeFlushStrategies();
// Clear map
flushStrategies.clear();
// Create strategies by host
for (String hostName : hosts) { for (String hostName : hosts) {
try { try {
final String[] parts = parseHostName(hostName); final RacNode node = new RacNode(hostName);
flushStrategies.put(hostName, createHostFlushStrategy(parts[0], Integer.parseInt(parts[1]), parts[2])); node.connect();
racNodes.add(node);
} }
catch (SQLException e) { catch (SQLException e) {
throw new DebeziumException("Cannot connect to RAC node '" + hostName + "'", e); throw new DebeziumException("Cannot connect to RAC node '" + hostName + "'", e);
@ -132,54 +146,143 @@ private void recreateRacNodeFlushStrategies() {
} }
} }
private String[] parseHostName(String hostName) { private class RacNode {
final String[] parts = new String[3]; private final String hostName;
final String[] colonParts = hostName.split(":");
parts[0] = colonParts[0]; private OracleConnection connection;
if (colonParts[1].contains("/")) { private LogWriterFlushStrategy flushStrategy;
// SID provided
final int slashIndex = colonParts[1].indexOf('/'); RacNode(String hostName) {
parts[1] = colonParts[1].substring(0, slashIndex); this.hostName = hostName;
parts[2] = colonParts[1].substring(slashIndex + 1);
return parts;
}
else {
// No SID provided
parts[1] = colonParts[1];
parts[2] = null;
} }
return parts; /**
} * Get the hostname for the RAC node.
* @return the hostname
private CommitLogWriterFlushStrategy createHostFlushStrategy(String hostName, Integer port, String sid) throws SQLException { */
Configuration.Builder jdbcConfigBuilder = jdbcConfiguration.edit() public String getHostName() {
.with(JdbcConfiguration.HOSTNAME, hostName) return this.hostName;
.with(JdbcConfiguration.PORT, port);
if (!Strings.isNullOrBlank(sid)) {
jdbcConfigBuilder = jdbcConfigBuilder.with(JdbcConfiguration.DATABASE, sid);
} }
final JdbcConfiguration jdbcHostConfig = JdbcConfiguration.adapt(jdbcConfigBuilder.build()); /**
* Get the strategy used by the node to flush
*
* @return the flush strategy
*/
public LogWriterFlushStrategy getFlushStrategy() {
return flushStrategy;
}
LOGGER.debug("Creating flush connection to RAC node '{}'", hostName); /**
return new CommitLogWriterFlushStrategy(connectorConfig, jdbcHostConfig); * Return whether the RAC node is current connected or not.
} */
public boolean isConnected() {
/**
* Closes the RAC node flush strategies.
*/
private void closeRacNodeFlushStrategies() {
for (CommitLogWriterFlushStrategy strategy : flushStrategies.values()) {
try { try {
// close the strategy's connection return connection != null && connection.isConnected();
strategy.close();
} }
catch (Exception e) { catch (SQLException e) {
LOGGER.warn("Failed to close RAC connection to node '{}'", strategy.getHost(), e); return false;
streamingMetrics.incrementWarningCount();
} }
} }
/**
* Connects to the RAC node
*
* @throws SQLException if a connection failure occurred
*/
public void connect() throws SQLException {
final String[] parts = parseHostName(hostName);
final String databaseHostName = parts[0];
final int port = Integer.parseInt(parts[1]);
final String sid = parts[2];
Configuration.Builder jdbcConfigBuilder = jdbcConfiguration.edit()
.with(JdbcConfiguration.HOSTNAME, databaseHostName)
.with(JdbcConfiguration.PORT, port);
if (!Strings.isNullOrBlank(sid)) {
jdbcConfigBuilder = jdbcConfigBuilder.with(JdbcConfiguration.DATABASE, sid);
}
final JdbcConfiguration jdbcHostConfig = JdbcConfiguration.adapt(jdbcConfigBuilder.build());
this.connection = new OracleConnection(jdbcHostConfig);
this.connection.setAutoCommit(false);
LOGGER.info("Created flush connection to RAC node '{}'", hostName);
this.flushStrategy = new CommitLogWriterFlushStrategy(connectorConfig, connection);
}
/**
* Reconnect to the RAC node
*/
void reconnect() {
try {
if (connection == null) {
// Close was called
connect();
}
else {
// Close wasn't called but the connection lost.
connection.reconnect();
// Recreate the flush strategy if needed
if (this.flushStrategy == null) {
this.flushStrategy = new CommitLogWriterFlushStrategy(connectorConfig, connection);
}
}
LOGGER.info("Successfully reconnected to Oracle RAC node '{}'", hostName);
}
catch (SQLException e) {
LOGGER.warn("Failed to reconnect to RAC node '{}': {}", hostName, e.getMessage());
close();
}
}
/**
* Closes the connection with the RAC node.
*/
public void close() {
if (flushStrategy != null) {
final String hostName = flushStrategy.getHost();
try {
flushStrategy.close();
}
catch (Exception e) {
LOGGER.warn("Failed to close RAC flush strategy to node '{}'", hostName, e);
streamingMetrics.incrementWarningCount();
}
}
if (connection != null) {
try {
connection.close();
}
catch (Exception e) {
LOGGER.warn("Failed to close RAC connection to node '{}'", hostName, e);
}
}
flushStrategy = null;
}
private String[] parseHostName(String hostName) {
final String[] parts = new String[3];
final String[] colonParts = hostName.split(":");
parts[0] = colonParts[0];
if (colonParts[1].contains("/")) {
// SID provided
final int slashIndex = colonParts[1].indexOf('/');
parts[1] = colonParts[1].substring(0, slashIndex);
parts[2] = colonParts[1].substring(slashIndex + 1);
return parts;
}
else {
// No SID provided
parts[1] = colonParts[1];
parts[2] = null;
}
return parts;
}
} }
} }