DBZ-7645 Revert removal of LogMiner continuous mining

This commit is contained in:
Chris Cranford 2024-03-14 17:28:28 -04:00 committed by Jiri Pechanec
parent 95a8145015
commit e26d3efedc
5 changed files with 57 additions and 24 deletions

View File

@ -621,6 +621,17 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withValidation(Field::isNonNegativeInteger)
.withDescription("The number of attempts to retry database errors during snapshots before failing.");
@Deprecated
public static final Field LOG_MINING_CONTINUOUS_MINE = Field.create("log.mining.continuous.mine")
.withDisplayName("Should log mining session configured with CONTINUOUS_MINE setting?")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(false)
.withValidation(Field::isBoolean)
.withDescription("(Deprecated) if true, CONTINUOUS_MINE option will be added to the log mining session. " +
"This will manage log files switches seamlessly.");
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("Oracle")
.excluding(
@ -692,7 +703,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
OLR_SOURCE,
OLR_HOST,
OLR_PORT,
SNAPSHOT_DATABASE_ERRORS_MAX_RETRIES)
SNAPSHOT_DATABASE_ERRORS_MAX_RETRIES,
LOG_MINING_CONTINUOUS_MINE)
.events(SOURCE_INFO_STRUCT_MAKER)
.create();
@ -760,6 +772,7 @@ public static ConfigDef configDef() {
private final String logMiningInifispanGlobalConfiguration;
private final Set<String> logMiningSchemaChangesUsernameExcludes;
private final Boolean logMiningIncludeRedoSql;
private final boolean logMiningContinuousMining;
private final String openLogReplicatorSource;
private final String openLogReplicatorHostname;
@ -828,6 +841,7 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningInifispanGlobalConfiguration = config.getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_GLOBAL);
this.logMiningSchemaChangesUsernameExcludes = Strings.setOf(config.getString(LOG_MINING_SCHEMA_CHANGES_USERNAME_EXCLUDE_LIST), String::new);
this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL);
this.logMiningContinuousMining = config.getBoolean(LOG_MINING_CONTINUOUS_MINE);
// OpenLogReplicator
this.openLogReplicatorSource = config.getString(OLR_SOURCE);
@ -1875,6 +1889,16 @@ public boolean isLogMiningIncludeRedoSql() {
return logMiningIncludeRedoSql;
}
/**
* Returns whether the LogMiner adapter should use continuous mining or not.
*
* @return true continuous mining should be used
*/
@Deprecated
public boolean isLogMiningContinuousMining() {
return logMiningContinuousMining;
}
/**
* Returns the logical source to stream changes from when connecting to OpenLogReplicator.
*

View File

@ -82,6 +82,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final int logFileQueryMaxRetries;
private final Duration initialDelay;
private final Duration maxDelay;
private final boolean continuousMining;
private Scn startScn; // startScn is the **exclusive** lower bound for mining
private Scn endScn;
@ -115,6 +116,8 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.maxDelay = connectorConfig.getLogMiningMaxDelay();
this.currentBatchSize = connectorConfig.getLogMiningBatchSizeDefault();
this.currentSleepTime = connectorConfig.getLogMiningSleepTimeDefault().toMillis();
this.continuousMining = connectorConfig.isLogMiningContinuousMining();
this.snapshotterService = snapshotterService;
this.streamingMetrics.setBatchSize(this.currentBatchSize);
@ -166,7 +169,7 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
}
try (LogWriterFlushStrategy flushStrategy = resolveFlushStrategy()) {
if (startScn.compareTo(firstScn.subtract(Scn.ONE)) < 0) {
if (!continuousMining && startScn.compareTo(firstScn.subtract(Scn.ONE)) < 0) {
// startScn is the exclusive lower bound, so must be >= (firstScn - 1)
throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
@ -181,7 +184,7 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
return;
}
initializeRedoLogsForMining(jdbcConnection, startScn);
initializeRedoLogsForMining(jdbcConnection, false, startScn);
int retryAttempts = 1;
Stopwatch sw = Stopwatch.accumulating().start();
@ -237,7 +240,7 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
if (connectorConfig.isLogMiningRestartConnection()) {
prepareConnection(true);
}
initializeRedoLogsForMining(jdbcConnection, startScn);
initializeRedoLogsForMining(jdbcConnection, true, startScn);
// log switch or restart required, re-create a new stop watch
sw = Stopwatch.accumulating().start();
@ -393,14 +396,16 @@ private LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
return bufferType.createProcessor(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, streamingMetrics);
}
private void initializeRedoLogsForMining(OracleConnection connection, Scn startScn) throws SQLException {
if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
private void initializeRedoLogsForMining(OracleConnection connection, boolean postEndMiningSession, Scn startScn) throws SQLException {
if ((!postEndMiningSession || !continuousMining)
&& OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
buildDataDictionary(connection);
}
currentLogFiles = setLogFilesForMining(connection, startScn, archiveLogRetention, archiveLogOnlyMode,
archiveDestinationName, logFileQueryMaxRetries, initialDelay, maxDelay);
currentRedoLogSequences = getCurrentLogFileSequences(currentLogFiles);
if (!continuousMining) {
currentLogFiles = setLogFilesForMining(connection, startScn, archiveLogRetention, archiveLogOnlyMode,
archiveDestinationName, logFileQueryMaxRetries, initialDelay, maxDelay);
currentRedoLogSequences = getCurrentLogFileSequences(currentLogFiles);
}
updateRedoLogMetrics();
}
@ -600,12 +605,12 @@ private OffsetDateTime getDatabaseSystemTime(OracleConnection connection) throws
* @throws SQLException if mining session failed to start
*/
public boolean startMiningSession(OracleConnection connection, Scn startScn, Scn endScn, int attempts) throws SQLException {
LOGGER.debug("Starting mining session startScn={}, endScn={}, strategy={}", startScn, endScn, strategy);
LOGGER.debug("Starting mining session startScn={}, endScn={}, strategy={}, continuous={}", startScn, endScn, strategy, continuousMining);
try {
Instant start = Instant.now();
// NOTE: we treat startSCN as the _exclusive_ lower bound for mining,
// whereas START_LOGMNR takes an _inclusive_ lower bound, hence the increment.
connection.executeWithoutCommitting(SqlUtils.startLogMinerStatement(startScn.add(Scn.ONE), endScn, strategy));
connection.executeWithoutCommitting(SqlUtils.startLogMinerStatement(startScn.add(Scn.ONE), endScn, strategy, continuousMining));
streamingMetrics.setLastMiningSessionStartDuration(Duration.between(start, Instant.now()));
return true;
}
@ -636,8 +641,8 @@ public boolean startMiningSession(OracleConnection connection, Scn startScn, Scn
*/
public void endMiningSession(OracleConnection connection, OracleOffsetContext offsetContext) throws SQLException {
try {
LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}",
startScn, endScn, offsetContext.getScn(), strategy);
LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
startScn, endScn, offsetContext.getScn(), strategy, continuousMining);
connection.executeWithoutCommitting("BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;");
}
catch (SQLException e) {

View File

@ -197,9 +197,10 @@ private static String localArchiveLogDestinationsOnlyQuery(String archiveDestina
* @param startScn mine from
* @param endScn mine till
* @param strategy Log Mining strategy
* @param continuousMining whether to use continuous mining
* @return statement todo: handle corruption. STATUS (Double) value of 0 indicates it is executable
*/
static String startLogMinerStatement(Scn startScn, Scn endScn, OracleConnectorConfig.LogMiningStrategy strategy) {
static String startLogMinerStatement(Scn startScn, Scn endScn, OracleConnectorConfig.LogMiningStrategy strategy, boolean continuousMining) {
String miningStrategy;
if (strategy.equals(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)) {
miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";
@ -207,6 +208,9 @@ static String startLogMinerStatement(Scn startScn, Scn endScn, OracleConnectorCo
else {
miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG ";
}
if (continuousMining) {
miningStrategy += " + DBMS_LOGMNR.CONTINUOUS_MINE ";
}
return "BEGIN sys.dbms_logmnr.start_logmnr(" +
"startScn => '" + startScn + "', " +
"endScn => '" + endScn + "', " +

View File

@ -34,26 +34,26 @@ public class LogMinerUtilsTest {
@Test
public void testStartLogMinerStatement() {
String statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO);
String statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO, false);
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_REDO_LOGS")).isTrue();
assertThat(statement.contains("DBMS_LOGMNR.DDL_DICT_TRACKING")).isTrue();
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG")).isFalse();
assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isFalse();
statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG);
statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG, false);
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_REDO_LOGS")).isFalse();
assertThat(statement.contains("DBMS_LOGMNR.DDL_DICT_TRACKING")).isFalse();
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG")).isTrue();
assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isFalse();
statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO);
statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO, true);
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_REDO_LOGS")).isTrue();
assertThat(statement.contains("DBMS_LOGMNR.DDL_DICT_TRACKING")).isTrue();
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG")).isFalse();
assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isFalse();
statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG);
assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isTrue();
statement = SqlUtils.startLogMinerStatement(SCN, OTHER_SCN, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG, true);
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_REDO_LOGS")).isFalse();
assertThat(statement.contains("DBMS_LOGMNR.DDL_DICT_TRACKING")).isFalse();
assertThat(statement.contains("DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG")).isTrue();
assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isFalse();
assertThat(statement.contains("DBMS_LOGMNR.CONTINUOUS_MINE")).isTrue();
}
// todo delete after replacement == -1 in the code

View File

@ -40,12 +40,12 @@ public void testStatements() {
expected = "SELECT 'KEY', LOG_GROUP_TYPE FROM ALL_LOG_GROUPS WHERE OWNER = 's' AND TABLE_NAME = 't'";
assertThat(result).isEqualTo(expected);
result = SqlUtils.startLogMinerStatement(Scn.valueOf(10L), Scn.valueOf(20L), OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG);
result = SqlUtils.startLogMinerStatement(Scn.valueOf(10L), Scn.valueOf(20L), OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG, true);
expected = "BEGIN sys.dbms_logmnr.start_logmnr(startScn => '10', endScn => '20', " +
"OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;";
"OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.CONTINUOUS_MINE + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;";
assertThat(result).isEqualTo(expected);
result = SqlUtils.startLogMinerStatement(Scn.valueOf(10L), Scn.valueOf(20L), OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO);
result = SqlUtils.startLogMinerStatement(Scn.valueOf(10L), Scn.valueOf(20L), OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO, false);
expected = "BEGIN sys.dbms_logmnr.start_logmnr(startScn => '10', endScn => '20', " +
"OPTIONS => DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING + DBMS_LOGMNR.NO_ROWID_IN_STMT);END;";
assertThat(result).isEqualTo(expected);