DBZ-2825 Suggested changes
This commit is contained in:
parent
60e53e561a
commit
7b5f0401c8
@ -711,7 +711,7 @@ public boolean isContinuousMining() {
|
||||
/**
|
||||
* @return the duration that archive logs are scanned for log mining
|
||||
*/
|
||||
public Duration getLogMiningArchiveLogHours() {
|
||||
public Duration getLogMiningArchiveLogRetention() {
|
||||
return Duration.ofHours(getConfig().getLong(LOG_MINING_ARCHIVE_LOG_HOURS));
|
||||
}
|
||||
|
||||
|
@ -277,14 +277,14 @@ static Set<String> getCurrentRedoLogFiles(Connection connection, LogMinerMetrics
|
||||
* This method fetches the oldest SCN from online redo log files
|
||||
*
|
||||
* @param connection container level database connection
|
||||
* @param archiveLogHours duration that archive logs are mined
|
||||
* @param archiveLogRetention duration that archive logs are mined
|
||||
* @return oldest SCN from online redo log
|
||||
* @throws SQLException if anything unexpected happens
|
||||
*/
|
||||
static long getFirstOnlineLogScn(Connection connection, Duration archiveLogHours) throws SQLException {
|
||||
static long getFirstOnlineLogScn(Connection connection, Duration archiveLogRetention) throws SQLException {
|
||||
LOGGER.trace("getting first scn of all online logs");
|
||||
Statement s = connection.createStatement();
|
||||
ResultSet res = s.executeQuery(SqlUtils.oldestFirstChangeQuery(archiveLogHours));
|
||||
ResultSet res = s.executeQuery(SqlUtils.oldestFirstChangeQuery(archiveLogRetention));
|
||||
res.next();
|
||||
long firstScnOfOnlineLog = res.getLong(1);
|
||||
res.close();
|
||||
@ -454,16 +454,16 @@ public static void endMining(Connection connection) {
|
||||
* This method substitutes CONTINUOUS_MINE functionality
|
||||
* @param connection connection
|
||||
* @param lastProcessedScn current offset
|
||||
* @param archiveLogHours the duration that archive logs will be mined
|
||||
* @param archiveLogRetention the duration that archive logs will be mined
|
||||
* @throws SQLException if anything unexpected happens
|
||||
*/
|
||||
// todo: check RAC resiliency
|
||||
public static void setRedoLogFilesForMining(Connection connection, Long lastProcessedScn, Duration archiveLogHours) throws SQLException {
|
||||
public static void setRedoLogFilesForMining(Connection connection, Long lastProcessedScn, Duration archiveLogRetention) throws SQLException {
|
||||
|
||||
removeLogFilesFromMining(connection);
|
||||
|
||||
Map<String, Long> onlineLogFilesForMining = getOnlineLogFilesForOffsetScn(connection, lastProcessedScn);
|
||||
Map<String, Long> archivedLogFilesForMining = getArchivedLogFilesForOffsetScn(connection, lastProcessedScn, archiveLogHours);
|
||||
Map<String, Long> archivedLogFilesForMining = getArchivedLogFilesForOffsetScn(connection, lastProcessedScn, archiveLogRetention);
|
||||
|
||||
if (onlineLogFilesForMining.size() + archivedLogFilesForMining.size() == 0) {
|
||||
throw new IllegalStateException("None of log files contains offset SCN: " + lastProcessedScn + ", re-snapshot is required.");
|
||||
@ -542,12 +542,12 @@ public static Map<String, Long> getOnlineLogFilesForOffsetScn(Connection connect
|
||||
* This method returns all archived log files for one day, containing given offset scn
|
||||
* @param connection connection
|
||||
* @param offsetScn offset scn
|
||||
* @param archiveLogHours duration that archive logs will be mined
|
||||
* @param archiveLogRetention duration that archive logs will be mined
|
||||
* @return Map of archived files
|
||||
* @throws SQLException if something happens
|
||||
*/
|
||||
public static Map<String, Long> getArchivedLogFilesForOffsetScn(Connection connection, Long offsetScn, Duration archiveLogHours) throws SQLException {
|
||||
Map<String, String> redoLogFiles = getMap(connection, SqlUtils.archiveLogsQuery(offsetScn, archiveLogHours), "-1");
|
||||
public static Map<String, Long> getArchivedLogFilesForOffsetScn(Connection connection, Long offsetScn, Duration archiveLogRetention) throws SQLException {
|
||||
Map<String, String> redoLogFiles = getMap(connection, SqlUtils.archiveLogsQuery(offsetScn, archiveLogRetention), "-1");
|
||||
return redoLogFiles.entrySet().stream().collect(
|
||||
Collectors.toMap(Map.Entry::getKey, e -> new BigDecimal(e.getValue()).longValue() == -1 ? Long.MAX_VALUE : new BigDecimal(e.getValue()).longValue()));
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
|
||||
private TransactionalBuffer transactionalBuffer;
|
||||
private long startScn;
|
||||
private long endScn;
|
||||
private Duration archiveLogHours;
|
||||
private Duration archiveLogRetention;
|
||||
|
||||
public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext,
|
||||
OracleConnection jdbcConnection, EventDispatcher<TableId> dispatcher,
|
||||
@ -110,7 +110,7 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
|
||||
this.racHosts.addAll(connectorConfig.getRacNodes().stream().map(String::toUpperCase).collect(Collectors.toSet()));
|
||||
instantiateFlushConnections(jdbcConfiguration, racHosts);
|
||||
}
|
||||
this.archiveLogHours = connectorConfig.getLogMiningArchiveLogHours();
|
||||
this.archiveLogRetention = connectorConfig.getLogMiningArchiveLogRetention();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -134,7 +134,7 @@ public void execute(ChangeEventSourceContext context) {
|
||||
startScn = offsetContext.getScn();
|
||||
createFlushTable(connection);
|
||||
|
||||
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection, archiveLogHours)) {
|
||||
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection, archiveLogRetention)) {
|
||||
throw new DebeziumException(
|
||||
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
|
||||
}
|
||||
@ -142,7 +142,7 @@ public void execute(ChangeEventSourceContext context) {
|
||||
setNlsSessionParameters(jdbcConnection);
|
||||
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName());
|
||||
|
||||
initializeRedoLogsForMining(connection, false, archiveLogHours);
|
||||
initializeRedoLogsForMining(connection, false, archiveLogRetention);
|
||||
|
||||
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
|
||||
try {
|
||||
@ -173,7 +173,7 @@ public void execute(ChangeEventSourceContext context) {
|
||||
// At this point we use a new mining session
|
||||
endMining(connection);
|
||||
|
||||
initializeRedoLogsForMining(connection, true, archiveLogHours);
|
||||
initializeRedoLogsForMining(connection, true, archiveLogRetention);
|
||||
|
||||
abandonOldTransactionsIfExist(connection);
|
||||
currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
|
||||
@ -275,13 +275,13 @@ private void updateStartScn() {
|
||||
startScn = endScn;
|
||||
}
|
||||
|
||||
private void initializeRedoLogsForMining(Connection connection, boolean postEndMiningSession, Duration archiveLogHours) throws SQLException {
|
||||
private void initializeRedoLogsForMining(Connection connection, boolean postEndMiningSession, Duration archiveLogRetention) throws SQLException {
|
||||
if (!postEndMiningSession) {
|
||||
if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
|
||||
buildDataDictionary(connection);
|
||||
}
|
||||
if (!isContinuousMining) {
|
||||
setRedoLogFilesForMining(connection, startScn, archiveLogHours);
|
||||
setRedoLogFilesForMining(connection, startScn, archiveLogRetention);
|
||||
}
|
||||
}
|
||||
else {
|
||||
@ -289,7 +289,7 @@ private void initializeRedoLogsForMining(Connection connection, boolean postEndM
|
||||
if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
|
||||
buildDataDictionary(connection);
|
||||
}
|
||||
setRedoLogFilesForMining(connection, startScn, archiveLogHours);
|
||||
setRedoLogFilesForMining(connection, startScn, archiveLogRetention);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -127,11 +127,11 @@ static String currentScnQuery() {
|
||||
return String.format("SELECT CURRENT_SCN FROM %s", DATABASE_VIEW);
|
||||
}
|
||||
|
||||
static String oldestFirstChangeQuery(Duration archiveLogHours) {
|
||||
if (!archiveLogHours.isNegative() && !archiveLogHours.isZero()) {
|
||||
static String oldestFirstChangeQuery(Duration archiveLogRetention) {
|
||||
if (!archiveLogRetention.isNegative() && !archiveLogRetention.isZero()) {
|
||||
return String.format("SELECT MIN(FIRST_CHANGE#) FROM (SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM %s " +
|
||||
"UNION SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM %s " +
|
||||
"WHERE FIRST_TIME >= SYSDATE - (%d/24))", LOG_VIEW, ARCHIVED_LOG_VIEW, archiveLogHours.toHours());
|
||||
"WHERE FIRST_TIME >= SYSDATE - (%d/24))", LOG_VIEW, ARCHIVED_LOG_VIEW, archiveLogRetention.toHours());
|
||||
}
|
||||
return String.format("SELECT MIN(FIRST_CHANGE#) FROM (SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM %s " +
|
||||
"UNION SELECT MIN(FIRST_CHANGE#) AS FIRST_CHANGE# FROM %s)", LOG_VIEW, ARCHIVED_LOG_VIEW);
|
||||
@ -148,14 +148,14 @@ public static String allOnlineLogsQuery() {
|
||||
* Obtain the query to be used to fetch archive logs.
|
||||
*
|
||||
* @param scn oldest scn to search for
|
||||
* @param archiveLogHours duration archive logs will be mined
|
||||
* @param archiveLogRetention duration archive logs will be mined
|
||||
* @return query
|
||||
*/
|
||||
public static String archiveLogsQuery(Long scn, Duration archiveLogHours) {
|
||||
if (!archiveLogHours.isNegative() && !archiveLogHours.isZero()) {
|
||||
public static String archiveLogsQuery(Long scn, Duration archiveLogRetention) {
|
||||
if (!archiveLogRetention.isNegative() && !archiveLogRetention.isZero()) {
|
||||
return String.format("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE FROM %s " +
|
||||
" WHERE NAME IS NOT NULL AND FIRST_TIME >= SYSDATE - (%d/24) AND ARCHIVED = 'YES' " +
|
||||
" AND STATUS = 'A' AND NEXT_CHANGE# > %s ORDER BY 2", ARCHIVED_LOG_VIEW, archiveLogHours.toHours(), scn);
|
||||
" AND STATUS = 'A' AND NEXT_CHANGE# > %s ORDER BY 2", ARCHIVED_LOG_VIEW, archiveLogRetention.toHours(), scn);
|
||||
}
|
||||
return String.format("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE FROM %s " +
|
||||
"WHERE NAME IS NOT NULL AND ARCHIVED = 'YES' " +
|
||||
|
@ -209,11 +209,9 @@ public static void forceLogfileSwitch() {
|
||||
Configuration config = adminConfig().build();
|
||||
Configuration jdbcConfig = config.subset("database.", true);
|
||||
|
||||
OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, TestHelper.class::getClassLoader);
|
||||
try {
|
||||
try(OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, TestHelper.class::getClassLoader)) {
|
||||
jdbcConnection.resetSessionToCdb();
|
||||
jdbcConnection.execute("ALTER SYSTEM SWITCH LOGFILE");
|
||||
jdbcConnection.close();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new RuntimeException("Failed to switch logfile", e);
|
||||
@ -224,8 +222,7 @@ public static int getNumberOfOnlineLogGroups() {
|
||||
Configuration config = adminConfig().build();
|
||||
Configuration jdbcConfig = config.subset("database.", true);
|
||||
|
||||
OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, TestHelper.class::getClassLoader);
|
||||
try {
|
||||
try(OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, TestHelper.class::getClassLoader)) {
|
||||
jdbcConnection.resetSessionToCdb();
|
||||
return jdbcConnection.queryAndMap("SELECT COUNT(GROUP#) FROM V$LOG", rs -> {
|
||||
rs.next();
|
||||
@ -235,14 +232,6 @@ public static int getNumberOfOnlineLogGroups() {
|
||||
catch (SQLException e) {
|
||||
throw new RuntimeException("Failed to get redo log groups", e);
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
jdbcConnection.close();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void forceFlushOfRedoLogsToArchiveLogs() {
|
||||
|
Loading…
Reference in New Issue
Block a user