DBZ-2552 Refactor & introduce LogWriterFlushStrategy

This commit is contained in:
Chris Cranford 2021-07-06 10:05:04 -04:00 committed by Jiri Pechanec
parent 1fd68e6fa9
commit 10ab20a157
8 changed files with 328 additions and 155 deletions

View File

@ -26,7 +26,7 @@
import io.debezium.config.Instantiator; import io.debezium.config.Instantiator;
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.oracle.logminer.SqlUtils; import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
@ -790,8 +790,7 @@ private boolean isExcludedSchema(TableId id) {
} }
private boolean isFlushTable(TableId id) { private boolean isFlushTable(TableId id) {
final String schema = config.getString(USER); return LogWriterFlushStrategy.isFlushTable(id, config.getString(USER));
return id.table().equalsIgnoreCase(SqlUtils.LOGMNR_FLUSH_TABLE) && id.schema().equalsIgnoreCase(schema);
} }
} }

View File

@ -11,6 +11,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.util.Strings; import io.debezium.util.Strings;
/** /**
@ -92,7 +93,7 @@ public static String build(OracleConnectorConfig connectorConfig) {
} }
// Always ignore the flush table // Always ignore the flush table
query.append("AND TABLE_NAME != '").append(SqlUtils.LOGMNR_FLUSH_TABLE).append("' "); query.append("AND TABLE_NAME != '").append(LogWriterFlushStrategy.LOGMNR_FLUSH_TABLE).append("' ");
// There are some common schemas that we automatically ignore when building the runtime Filter // There are some common schemas that we automatically ignore when building the runtime Filter
// predicates and we put that same list of schemas here and apply those in the generated SQL. // predicates and we put that same list of schemas here and apply those in the generated SQL.

View File

@ -17,15 +17,12 @@
import java.time.Instant; import java.time.Instant;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -39,6 +36,9 @@
import io.debezium.connector.oracle.OraclePartition; import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
@ -61,7 +61,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final EventDispatcher<TableId> dispatcher; private final EventDispatcher<TableId> dispatcher;
private final Clock clock; private final Clock clock;
private final OracleDatabaseSchema schema; private final OracleDatabaseSchema schema;
private final Set<String> racHosts = new HashSet<>();
private final JdbcConfiguration jdbcConfiguration; private final JdbcConfiguration jdbcConfiguration;
private final OracleConnectorConfig.LogMiningStrategy strategy; private final OracleConnectorConfig.LogMiningStrategy strategy;
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
@ -75,7 +74,6 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private Scn startScn; private Scn startScn;
private Scn endScn; private Scn endScn;
private List<BigInteger> currentRedoLogSequences; private List<BigInteger> currentRedoLogSequences;
private Map<String, OracleConnection> flushConnections;
public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection, EventDispatcher<TableId> dispatcher, OracleConnection jdbcConnection, EventDispatcher<TableId> dispatcher,
@ -91,10 +89,6 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
this.streamingMetrics = streamingMetrics; this.streamingMetrics = streamingMetrics;
this.jdbcConfiguration = JdbcConfiguration.adapt(jdbcConfig); this.jdbcConfiguration = JdbcConfiguration.adapt(jdbcConfig);
if (connectorConfig.isRacSystem()) {
this.racHosts.addAll(connectorConfig.getRacNodes().stream().map(String::toUpperCase).collect(Collectors.toSet()));
instantiateRacFlushConnections(jdbcConfiguration, racHosts);
}
this.archiveLogRetention = connectorConfig.getLogMiningArchiveLogRetention(); this.archiveLogRetention = connectorConfig.getLogMiningArchiveLogRetention();
this.archiveLogOnlyMode = connectorConfig.isArchiveLogOnlyMode(); this.archiveLogOnlyMode = connectorConfig.isArchiveLogOnlyMode();
this.archiveDestinationName = connectorConfig.getLogMiningArchiveDestinationName(); this.archiveDestinationName = connectorConfig.getLogMiningArchiveDestinationName();
@ -109,10 +103,9 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
@Override @Override
public void execute(ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) { public void execute(ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) {
try (TransactionalBuffer transactionalBuffer = new TransactionalBuffer(connectorConfig, schema, clock, errorHandler, streamingMetrics)) { try (TransactionalBuffer transactionalBuffer = new TransactionalBuffer(connectorConfig, schema, clock, errorHandler, streamingMetrics)) {
try {
startScn = offsetContext.getScn(); startScn = offsetContext.getScn();
createFlushTable(jdbcConnection);
try (LogWriterFlushStrategy flushStrategy = resolveFlushStrategy()) {
if (!isContinuousMining && startScn.compareTo(getFirstScnInLogs(jdbcConnection)) < 0) { if (!isContinuousMining && startScn.compareTo(getFirstScnInLogs(jdbcConnection)) < 0) {
throw new DebeziumException( throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot."); "Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
@ -136,7 +129,7 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
Instant start = Instant.now(); Instant start = Instant.now();
endScn = calculateEndScn(jdbcConnection, startScn, endScn); endScn = calculateEndScn(jdbcConnection, startScn, endScn);
flushLogWriter(jdbcConnection, jdbcConfiguration, racHosts); flushStrategy.flush(jdbcConnection.getCurrentScn());
if (hasLogSwitchOccurred()) { if (hasLogSwitchOccurred()) {
// This is the way to mitigate PGA leaks. // This is the way to mitigate PGA leaks.
@ -394,139 +387,6 @@ private void pauseBetweenMiningSessions() throws InterruptedException {
Metronome.sleeper(period, clock).pause(); Metronome.sleeper(period, clock).pause();
} }
/**
* Flushes the Oracle LGWR buffer if connected to a standalone Oracle instance or all LGWR buffers on
* each Oracle RAC node when connected to an Oracle RAC cluster.
*
* @param connection database connection the primary instance, must not be {@code null}
* @param jdbcConfig database configuration, must not be {@code null}
* @param hosts set of RAC hosts or ip addresses, should not be {@code null} but may be empty.
* @throws SQLException if a database exception occurred
* @throws InterruptedException if the flushing to an Oracle RAC cluster was interrupted
*/
private void flushLogWriter(OracleConnection connection, JdbcConfiguration jdbcConfig, Set<String> hosts) throws SQLException, InterruptedException {
Scn currentScn = connection.getCurrentScn();
if (!hosts.isEmpty()) {
flushRacLogWriters(currentScn, jdbcConfig, hosts);
}
else {
LOGGER.trace("Flushing LGWR buffer on instance '{}'", jdbcConfig.getHostname());
connection.executeWithoutCommitting("UPDATE " + SqlUtils.LOGMNR_FLUSH_TABLE + " SET LAST_SCN =" + currentScn);
connection.commit();
}
}
/**
* An Oracle RAC cluster has one LGWR process per node and each needs to be flushed.
* Queries to {@code GV$INSTANCE} cannot be used because not all nodes may be load balanced by the cluster.
*
* @param currentScn value to be flushed to the node's local flush table, must not be {@code null}
* @param jdbcConfig database configuration, must not be {@code null}
* @param hosts set of RAC hosts or ip addresses, should not be {@code null} or empty.
* @throws InterruptedException if the flushing to the Oracle RAC cluster was interrupted
*/
private void flushRacLogWriters(Scn currentScn, JdbcConfiguration jdbcConfig, Set<String> hosts) throws InterruptedException {
Instant startTime = Instant.now();
boolean recreateConnections = false;
for (String hostName : hosts) {
try {
final OracleConnection connection = flushConnections.get(hostName);
if (connection == null) {
LOGGER.warn("Connection to RAC node '{}' does not exist; will be re-created.", hostName);
recreateConnections = true;
continue;
}
LOGGER.trace("Flushing LGWR buffer on RAC node '{}'", hostName);
connection.executeWithoutCommitting("UPDATE " + SqlUtils.LOGMNR_FLUSH_TABLE + " SET LAST_SCN =" + currentScn);
connection.commit();
}
catch (Exception e) {
LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}'", hostName, e);
recreateConnections = true;
}
}
if (recreateConnections) {
instantiateRacFlushConnections(jdbcConfig, hosts);
LOGGER.warn("Not all LGWR buffers were flushed, waiting 3 seconds for Oracle to flush automatically.");
Metronome pause = Metronome.sleeper(Duration.ofSeconds(3), Clock.system());
try {
pause.pause();
}
catch (InterruptedException e) {
LOGGER.warn("The LGWR buffer wait was interrupted.");
throw e;
}
}
LOGGER.trace("LGWR flush took {} to complete.", Duration.between(startTime, Instant.now()));
}
/**
* Instantiates a RAC flush connection to each RAC cluster node.
*
* @param jdbcConfig database connection configuration
* @param hosts set of Oracle RAC node hosts or ip addresses
*/
private void instantiateRacFlushConnections(JdbcConfiguration jdbcConfig, Set<String> hosts) {
if (flushConnections == null) {
flushConnections = new HashMap<>();
}
// If any existing connections, close them.
for (Map.Entry<String, OracleConnection> entry : flushConnections.entrySet()) {
final String hostName = entry.getKey();
final OracleConnection connection = entry.getValue();
if (connection != null) {
try {
connection.close();
}
catch (SQLException e) {
// It's fine not to throw this exception, a new connection will be established
LOGGER.warn("Failed to close RAC connection to node '{}'", hostName, e);
streamingMetrics.incrementWarningCount();
}
}
}
flushConnections.clear();
// Create new connections
final Supplier<ClassLoader> classLoaderSupplier = LogMinerStreamingChangeEventSource.class::getClassLoader;
for (String hostName : hosts) {
try {
JdbcConfiguration jdbcHostConfig = JdbcConfiguration.adapt(jdbcConfig.edit()
.with(JdbcConfiguration.HOSTNAME, hostName).build());
LOGGER.debug("Creating flush connection to RAC node '{}'", hostName);
final OracleConnection flushConnection = new OracleConnection(jdbcHostConfig, classLoaderSupplier);
flushConnection.setAutoCommit(false);
flushConnections.put(hostName, flushConnection);
}
catch (SQLException e) {
throw new DebeziumException("Cannot connect to RAC node '" + hostName + "'", e);
}
}
}
/**
* Creates the flush table used to force flushing of the Oracle LGWR buffers.
*
* @param connection database connection
* @throws SQLException if a database exception occurred
*/
private void createFlushTable(OracleConnection connection) throws SQLException {
if (!connection.isTableExists(SqlUtils.LOGMNR_FLUSH_TABLE)) {
connection.executeWithoutCommitting("CREATE TABLE " + SqlUtils.LOGMNR_FLUSH_TABLE + "(LAST_SCN NUMBER(19,0))");
}
if (connection.isTableEmpty(SqlUtils.LOGMNR_FLUSH_TABLE)) {
connection.executeWithoutCommitting("INSERT INTO " + SqlUtils.LOGMNR_FLUSH_TABLE + " VALUES(0)");
connection.commit();
}
}
/** /**
* Sets the NLS parameters for the mining session. * Sets the NLS parameters for the mining session.
* *
@ -762,6 +622,18 @@ private boolean isTableAllColumnsSupplementalLoggingEnabled(OracleConnection con
}); });
} }
/**
* Resolves the Oracle LGWR buffer flushing strategy.
*
* @return the strategy to be used to flush Oracle's LGWR process, never {@code null}.
*/
private LogWriterFlushStrategy resolveFlushStrategy() {
if (connectorConfig.isRacSystem()) {
return new RacCommitLogWriterFlushStrategy(connectorConfig, jdbcConfiguration, streamingMetrics);
}
return new CommitLogWriterFlushStrategy(jdbcConnection);
}
@Override @Override
public void commitOffset(Map<String, ?> offset) { public void commitOffset(Map<String, ?> offset) {
// nothing to do // nothing to do

View File

@ -42,9 +42,6 @@ public class SqlUtils {
private static final String ARCHIVE_DEST_STATUS_VIEW = "V$ARCHIVE_DEST_STATUS"; private static final String ARCHIVE_DEST_STATUS_VIEW = "V$ARCHIVE_DEST_STATUS";
private static final String ALL_LOG_GROUPS = "ALL_LOG_GROUPS"; private static final String ALL_LOG_GROUPS = "ALL_LOG_GROUPS";
// log writer flush statements
public static final String LOGMNR_FLUSH_TABLE = "LOG_MINING_FLUSH";
static String redoLogStatusQuery() { static String redoLogStatusQuery() {
return String.format("SELECT F.MEMBER, R.STATUS FROM %s F, %s R WHERE F.GROUP# = R.GROUP# ORDER BY 2", LOGFILE_VIEW, LOG_VIEW); return String.format("SELECT F.MEMBER, R.STATUS FROM %s F, %s R WHERE F.GROUP# = R.GROUP# ORDER BY 2", LOGFILE_VIEW, LOG_VIEW);
} }

View File

@ -0,0 +1,105 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.logwriter;
import java.sql.SQLException;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConfiguration;
/**
* A {@link LogWriterFlushStrategy} that uses a transaction commit to force the provided
* connection's Oracle LogWriter (LGWR) process to flush to disk.
*
* @author Chris Cranford
*/
public class CommitLogWriterFlushStrategy implements LogWriterFlushStrategy {
private static final String CREATE_FLUSH_TABLE = "CREATE TABLE " + LOGMNR_FLUSH_TABLE + "(LAST_SCN NUMBER(19,0))";
private static final String INSERT_FLUSH_TABLE = "INSERT INTO " + LOGMNR_FLUSH_TABLE + " VALUES (0)";
private static final String UPDATE_FLUSH_TABLE = "UPDATE " + LOGMNR_FLUSH_TABLE + " SET LAST_SCN = ";
private final OracleConnection connection;
private final boolean closeConnectionOnClose;
/**
* Creates a transaction-commit Oracle LogWriter (LGWR) process flush strategy.
*
* This will use the existing database connection to make the flush and the connection will not
* be automatically closed when the strategy is closed.
*
* @param connection the connection to be used to force the flush, must not be {@code null}
*/
public CommitLogWriterFlushStrategy(OracleConnection connection) {
this.connection = connection;
this.closeConnectionOnClose = false;
createFlushTableIfNotExists();
}
/**
* Creates a transaction-commit Oracle LogWriter (LGWR) process flush strategy.
*
* This will create a new database connection based on the supplied JDBC configuration and the
* connection will automatically be closed when the strategy is closed.
*
* @param jdbcConfig the jdbc configuration
* @throws SQLException if there was a database problem
*/
public CommitLogWriterFlushStrategy(JdbcConfiguration jdbcConfig) throws SQLException {
this.connection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader());
this.connection.setAutoCommit(false);
this.closeConnectionOnClose = true;
createFlushTableIfNotExists();
}
@Override
public void close() {
if (closeConnectionOnClose) {
try {
connection.close();
}
catch (SQLException e) {
throw new DebeziumException("Failed to close connection to host '" + getHost() + "'", e);
}
}
}
@Override
public String getHost() {
return connection.config().getHostname();
}
@Override
public void flush(Scn currentScn) {
try {
connection.execute(UPDATE_FLUSH_TABLE + currentScn);
}
catch (SQLException e) {
throw new DebeziumException("Failed to flush Oracle LogWriter (LGWR) buffers to disk", e);
}
}
/**
* Makes sure that the flush table is created in the database and that it at least has 1 row of data
* so that when flushes occur that the update succeeds without failure.
*/
private void createFlushTableIfNotExists() {
try {
if (!connection.isTableExists(LOGMNR_FLUSH_TABLE)) {
connection.executeWithoutCommitting(CREATE_FLUSH_TABLE);
}
if (!connection.isTableEmpty(LOGMNR_FLUSH_TABLE)) {
connection.executeWithoutCommitting(INSERT_FLUSH_TABLE);
connection.commit();
}
}
catch (SQLException e) {
throw new DebeziumException("Failed to create flush table", e);
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.logwriter;
import io.debezium.connector.oracle.Scn;
import io.debezium.relational.TableId;
/**
* Strategy that controls how the Oracle LGWR (LogWriter) process is to be flushed.
*
* @author Chris Cranford
*/
public interface LogWriterFlushStrategy extends AutoCloseable {
/**
* The LogMiner implemenation's flush table name.
*/
String LOGMNR_FLUSH_TABLE = "LOG_MINING_FLUSH";
/**
* @return the host or ip address that will be flushed by the strategy
*/
String getHost();
/**
* Perform the Oracle LGWR process flush.
*
* @param currentScn the current system change number
*/
void flush(Scn currentScn) throws InterruptedException;
/**
* Checks whether the supplied {@code TableId} is the flush table.
*
* @param id the table id
* @param schemaName the schema name
* @return true if the table is the flush table, false otherwise
*/
static boolean isFlushTable(TableId id, String schemaName) {
return id.table().equalsIgnoreCase(LOGMNR_FLUSH_TABLE) && id.schema().equalsIgnoreCase(schemaName);
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.logwriter;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
/**
* A {@link LogWriterFlushStrategy} for Oracle RAC that performs a transaction-scoped commit
* to flush the Oracle LogWriter (LGWR) process on each RAC node.
*
* This strategy builds atop of {@link CommitLogWriterFlushStrategy} by creating a commit strategy
* to each Oracle RAC node and orchestrating the flushes simultaneously for each node when a flush
* is needed. In the event that a node fails to flush, this strategy will delay for 3 seconds to
* allow Oracle to automatically flush the buffers before proceeding.
*
* @author Chris Cranford
*/
public class RacCommitLogWriterFlushStrategy implements LogWriterFlushStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(RacCommitLogWriterFlushStrategy.class);
private final Map<String, CommitLogWriterFlushStrategy> flushStrategies = new HashMap<>();
private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
private final JdbcConfiguration jdbcConfiguration;
private final Set<String> hosts;
/**
* Creates an Oracle RAC LogWriter (LGWR) flushing strategy.
*
* @param connectorConfig the connector configuration, must not be {@code null}
* @param jdbcConfig the mining session JDBC connection configuration, must not be {@code null}
* @param streamingMetrics the streaming metrics, must not be {@code null}
*/
public RacCommitLogWriterFlushStrategy(OracleConnectorConfig connectorConfig, JdbcConfiguration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
this.jdbcConfiguration = jdbcConfig;
this.streamingMetrics = streamingMetrics;
this.hosts = connectorConfig.getRacNodes().stream().map(String::toUpperCase).collect(Collectors.toSet());
recreateRacNodeFlushStrategies();
}
@Override
public void close() {
closeRacNodeFlushStrategies();
flushStrategies.clear();
}
@Override
public String getHost() {
return String.join(", ", hosts);
}
@Override
public void flush(Scn currentScn) throws InterruptedException {
// Oracle RAC has one LogWriter (LGWR) process per node (instance).
// For this configuration, all LGWR processes across all instances must be flushed.
// Queries cannot be used such as gv_instance as not all nodes could be load balanced.
Instant startTime = Instant.now();
if (flushStrategies.isEmpty()) {
throw new DebeziumException("No RAC node addresses supplied or currently connected");
}
boolean recreateConnections = false;
for (Map.Entry<String, CommitLogWriterFlushStrategy> entry : flushStrategies.entrySet()) {
final CommitLogWriterFlushStrategy strategy = entry.getValue();
try {
// Flush the node's commit log writer
strategy.flush(currentScn);
}
catch (Exception e) {
LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}'", strategy.getHost(), e);
recreateConnections = true;
}
}
if (recreateConnections) {
recreateRacNodeFlushStrategies();
LOGGER.warn("Not all LGWR buffers were flushed, waiting 3 seconds for Oracle to flush automatically.");
Metronome metronome = Metronome.sleeper(Duration.ofSeconds(3), Clock.SYSTEM);
try {
metronome.pause();
}
catch (InterruptedException e) {
LOGGER.warn("The LGWR buffer wait was interrupted.");
throw e;
}
}
LOGGER.trace("LGWR flush took {} to complete.", Duration.between(startTime, Instant.now()));
}
private void recreateRacNodeFlushStrategies() {
// Close existing flush strategies to RAC nodes
closeRacNodeFlushStrategies();
// Clear map
flushStrategies.clear();
// Create strategies by host
for (String hostName : hosts) {
try {
flushStrategies.put(hostName, createHostFlushStrategy(hostName));
}
catch (SQLException e) {
throw new DebeziumException("Cannot connect to RAC node '" + hostName + "'", e);
}
}
}
private CommitLogWriterFlushStrategy createHostFlushStrategy(String hostName) throws SQLException {
JdbcConfiguration jdbcHostConfig = JdbcConfiguration.adapt(jdbcConfiguration.edit()
.with(JdbcConfiguration.HOSTNAME, hostName).build());
LOGGER.debug("Creating flush connection to RAC node '{}'", hostName);
return new CommitLogWriterFlushStrategy(jdbcHostConfig);
}
/**
* Closes the RAC node flush strategies.
*/
private void closeRacNodeFlushStrategies() {
for (CommitLogWriterFlushStrategy strategy : flushStrategies.values()) {
try {
// close the strategy's connection
strategy.close();
}
catch (Exception e) {
LOGGER.warn("Failed to close RAC connection to node '{}'", strategy.getHost(), e);
streamingMetrics.incrementWarningCount();
}
}
}
}

View File

@ -24,6 +24,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.util.TestHelper; import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
@ -54,7 +55,7 @@ public class LogMinerQueryBuilderTest {
"AND INFO NOT LIKE 'INTERNAL DDL%' " + "AND INFO NOT LIKE 'INTERNAL DDL%' " +
"AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%')) ) " + "AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%')) ) " +
"OR (OPERATION_CODE IN ${operationCodes} " + "OR (OPERATION_CODE IN ${operationCodes} " +
"AND TABLE_NAME != '" + SqlUtils.LOGMNR_FLUSH_TABLE + "' " + "AND TABLE_NAME != '" + LogWriterFlushStrategy.LOGMNR_FLUSH_TABLE + "' " +
"${systemTablePredicate}" + "${systemTablePredicate}" +
"${schemaPredicate}" + "${schemaPredicate}" +
"${tablePredicate}" + "${tablePredicate}" +
@ -74,7 +75,7 @@ public class LogMinerQueryBuilderTest {
"(OPERATION_CODE = 5 AND USERNAME NOT IN ('SYS','SYSTEM') " + "(OPERATION_CODE = 5 AND USERNAME NOT IN ('SYS','SYSTEM') " +
"AND INFO NOT LIKE 'INTERNAL DDL%' " + "AND INFO NOT LIKE 'INTERNAL DDL%' " +
"AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))) " + "AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))) " +
"AND TABLE_NAME != '" + SqlUtils.LOGMNR_FLUSH_TABLE + "' " + "AND TABLE_NAME != '" + LogWriterFlushStrategy.LOGMNR_FLUSH_TABLE + "' " +
"${systemTablePredicate}" + "${systemTablePredicate}" +
"${schemaPredicate}" + "${schemaPredicate}" +
"${tablePredicate}" + "${tablePredicate}" +