DBZ-2531 Implement connection retry for Oracle

This commit is contained in:
Chris Cranford 2020-09-17 15:10:17 -04:00 committed by Gunnar Morling
parent dda9ba96e3
commit aedb0b9a57
4 changed files with 121 additions and 105 deletions

View File

@ -75,7 +75,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();
errorHandler = new ErrorHandler(OracleConnector.class, connectorConfig.getLogicalName(), queue);
errorHandler = new OracleErrorHandler(connectorConfig.getLogicalName(), queue);
final OracleEventMetadataProvider metadataProvider = new OracleEventMetadataProvider();

View File

@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.io.IOException;
import java.sql.SQLRecoverableException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import oracle.net.ns.NetException;
/**
* Error handle for Oracle.
*
* @author Chris Cranford
*/
public class OracleErrorHandler extends ErrorHandler {
public OracleErrorHandler(String logicalName, ChangeEventQueue<?> queue) {
super(OracleConnector.class, logicalName, queue);
}
@Override
protected boolean isRetriable(Throwable throwable) {
if (throwable.getMessage() == null || throwable.getCause() == null) {
return false;
}
return throwable.getMessage().startsWith("ORA-03135") || // connection lost
throwable.getMessage().startsWith("ORA-12543") || // TNS:destination host unreachable
throwable.getMessage().startsWith("ORA-00604") || // error occurred at recursive SQL level 1
throwable.getMessage().startsWith("ORA-01089") || // Oracle immediate shutdown in progress
throwable.getCause() instanceof IOException ||
throwable instanceof SQLRecoverableException ||
throwable.getMessage().toUpperCase().startsWith("NO MORE DATA TO READ FROM SOCKET") ||
throwable.getCause().getCause() instanceof NetException;
}
}

View File

@ -20,13 +20,11 @@
import static io.debezium.connector.oracle.logminer.LogMinerHelper.setRedoLogFilesForMining;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.startOnlineMining;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
@ -50,8 +48,6 @@
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import oracle.net.ns.NetException;
/**
* A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility.
* The event handler loop is executed in a separate executor.
@ -114,111 +110,103 @@ public void execute(ChangeEventSourceContext context) {
this.logMinerMetrics = new LogMinerMetrics(taskContext);
this.logMinerMetrics.register(LOGGER);
// The top outer loop gives the resiliency on the network disconnections. This is critical for cloud deployment.
while (context.isRunning()) {
try (Connection connection = jdbcConnection.connection(false);
PreparedStatement fetchFromMiningView = connection
.prepareStatement(SqlUtils.queryLogMinerContents(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
try (Connection connection = jdbcConnection.connection(false);
PreparedStatement fetchFromMiningView = connection
.prepareStatement(SqlUtils.queryLogMinerContents(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
startScn = offsetContext.getScn();
createAuditTable(connection);
LOGGER.trace("current millis {}, db time {}", System.currentTimeMillis(), getTimeDifference(connection).toMillis());
transactionalBufferMetrics.setTimeDifference(new AtomicLong(getTimeDifference(connection).toMillis()));
startScn = offsetContext.getScn();
createAuditTable(connection);
LOGGER.trace("current millis {}, db time {}", System.currentTimeMillis(), getTimeDifference(connection).toMillis());
transactionalBufferMetrics.setTimeDifference(new AtomicLong(getTimeDifference(connection).toMillis()));
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection)) {
throw new RuntimeException("Online REDO LOG files don't contain the offset SCN. Clean offset and start over");
}
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection)) {
throw new RuntimeException("Online REDO LOG files don't contain the offset SCN. Clean offset and start over");
}
// 1. Configure Log Miner to mine online redo logs
// todo: DBZ-137 this step can actually be executed outside the loop at start-up.
setNlsSessionParameters(jdbcConnection);
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName());
// 1. Configure Log Miner to mine online redo logs
setNlsSessionParameters(jdbcConnection);
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName());
if (strategy == OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO) {
buildDataDictionary(connection);
}
if (strategy == OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO) {
buildDataDictionary(connection);
}
if (!isContinuousMining) {
setRedoLogFilesForMining(connection, startScn);
}
if (!isContinuousMining) {
setRedoLogFilesForMining(connection, startScn);
}
LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics, transactionalBuffer,
dmlParser, offsetContext, schema, dispatcher, transactionalBufferMetrics, catalogName, clock);
LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics, transactionalBuffer,
dmlParser, offsetContext, schema, dispatcher, transactionalBufferMetrics, catalogName, clock);
// 2. Querying LogMiner view while running
Set<String> currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
while (context.isRunning()) {
// 2. Querying LogMiner view while running
Set<String> currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
while (context.isRunning()) {
endScn = getEndScn(connection, startScn, logMinerMetrics);
// LOGGER.trace("startScn: {}, endScn: {}", startScn, endScn);
endScn = getEndScn(connection, startScn, logMinerMetrics);
// LOGGER.trace("startScn: {}, endScn: {}", startScn, endScn);
metronome = Metronome.sleeper(Duration.ofMillis(logMinerMetrics.getMillisecondToSleepBetweenMiningQuery()), clock);
metronome.pause();
metronome = Metronome.sleeper(Duration.ofMillis(logMinerMetrics.getMillisecondToSleepBetweenMiningQuery()), clock);
metronome.pause();
Set<String> possibleNewCurrentLogFile = getCurrentRedoLogFiles(connection, logMinerMetrics);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("\n\n***** SWITCH occurred *****\n" + " from:{} , to:{} \n\n", currentRedoLogFiles, possibleNewCurrentLogFile);
Set<String> possibleNewCurrentLogFile = getCurrentRedoLogFiles(connection, logMinerMetrics);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("\n\n***** SWITCH occurred *****\n" + " from:{} , to:{} \n\n", currentRedoLogFiles, possibleNewCurrentLogFile);
// This is the way to mitigate PGA leak.
// With one mining session it grows and maybe there is another way to flush PGA, but at this point we use new mining session
endMining(connection);
// This is the way to mitigate PGA leak.
// With one mining session it grows and maybe there is another way to flush PGA, but at this point we use new mining session
endMining(connection);
if (!isContinuousMining) {
if (strategy == OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO) {
buildDataDictionary(connection);
}
abandonOldTransactionsIfExist(connection);
setRedoLogFilesForMining(connection, startScn);
if (!isContinuousMining) {
if (strategy == OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO) {
buildDataDictionary(connection);
}
currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
abandonOldTransactionsIfExist(connection);
setRedoLogFilesForMining(connection, startScn);
}
startOnlineMining(connection, startScn, endScn, strategy, isContinuousMining);
Instant startTime = Instant.now();
fetchFromMiningView.setFetchSize(10_000);
fetchFromMiningView.setLong(1, startScn);
fetchFromMiningView.setLong(2, endScn);
ResultSet res = fetchFromMiningView.executeQuery();
logMinerMetrics.setLastLogMinerQueryDuration(Duration.between(startTime, Instant.now()));
processor.processResult(res);
updateStartScn();
// LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn());
// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
offsetContext.setScn(startScn);
transactionalBuffer.resetLargestScn(null);
}
res.close();
// we don't do it for other modes to save time on building data dictionary
// if (strategy == OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) {
// endMining(connection);
// updateRedoLogMetrics(connection, logMinerMetrics);
// currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
// }
currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
}
}
catch (Throwable e) {
if (connectionProblem(e)) {
logWarn(transactionalBufferMetrics, "Disconnection occurred. {} ", e.toString());
continue;
startOnlineMining(connection, startScn, endScn, strategy, isContinuousMining);
Instant startTime = Instant.now();
fetchFromMiningView.setFetchSize(10_000);
fetchFromMiningView.setLong(1, startScn);
fetchFromMiningView.setLong(2, endScn);
ResultSet res = fetchFromMiningView.executeQuery();
logMinerMetrics.setLastLogMinerQueryDuration(Duration.between(startTime, Instant.now()));
processor.processResult(res);
updateStartScn();
// LOGGER.trace("largest scn = {}", transactionalBuffer.getLargestScn());
// update SCN in offset context only if buffer is empty, otherwise we update offset in TransactionalBuffer
if (transactionalBuffer.isEmpty()) {
offsetContext.setScn(startScn);
transactionalBuffer.resetLargestScn(null);
}
logError(transactionalBufferMetrics, "Mining session was stopped due to the {} ", e.toString());
throw new RuntimeException(e);
}
finally {
LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
LOGGER.info("Transactional buffer metrics dump: {}", transactionalBufferMetrics.toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString());
res.close();
// we don't do it for other modes to save time on building data dictionary
// if (strategy == OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG) {
// endMining(connection);
// updateRedoLogMetrics(connection, logMinerMetrics);
// currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
// }
}
}
catch (Throwable e) {
logError(transactionalBufferMetrics, "Mining session stopped due to the {} ", e.toString());
errorHandler.setProducerThrowable(e);
}
finally {
LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
LOGGER.info("Transactional buffer metrics dump: {}", transactionalBufferMetrics.toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString());
}
logMinerMetrics.unregister(LOGGER);
transactionalBufferMetrics.unregister(LOGGER);
@ -247,18 +235,4 @@ private void updateStartScn() {
public void commitOffset(Map<String, ?> offset) {
// nothing to do
}
private boolean connectionProblem(Throwable e) {
if (e.getMessage() == null || e.getCause() == null) {
return false;
}
return e.getMessage().startsWith("ORA-03135") || // connection lost contact
e.getMessage().startsWith("ORA-12543") || // TNS:destination host unreachable
e.getMessage().startsWith("ORA-00604") || // error occurred at recursive SQL level 1
e.getMessage().startsWith("ORA-01089") || // Oracle immediate shutdown in progress
e.getCause() instanceof IOException ||
e instanceof SQLRecoverableException ||
e.getMessage().toUpperCase().startsWith("NO MORE DATA TO READ FROM SOCKET") ||
e.getCause().getCause() instanceof NetException;
}
}

View File

@ -79,7 +79,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
}
catch (Throwable e) {
throw new RuntimeException(e);
errorHandler.setProducerThrowable(e);
}
finally {
// 3. disconnect