DBZ-3001 Control MAX Scn for LogMiner based on Oracle version

This commit is contained in:
Chris Cranford 2021-02-15 13:25:42 -05:00 committed by Gunnar Morling
parent 9f2e2971b7
commit d061f58b05
8 changed files with 418 additions and 148 deletions

View File

@ -53,6 +53,11 @@ public class OracleConnection extends JdbcConnection {
*/
private static final Field URL = Field.create("url", "Raw JDBC url");
/**
* The database version.
*/
private OracleDatabaseVersion databaseVersion;
public OracleConnection(Configuration config, Supplier<ClassLoader> classLoaderSupplier) {
super(config, resolveConnectionFactory(config), classLoaderSupplier);
}
@ -101,6 +106,60 @@ public void resetSessionToCdb() {
}
}
public OracleDatabaseVersion getOracleVersion() {
if (databaseVersion == null) {
databaseVersion = resolveOracleDatabaseVersion();
LOGGER.info("Database Version: {}", databaseVersion.getBanner());
}
return databaseVersion;
}
private OracleDatabaseVersion resolveOracleDatabaseVersion() {
String versionStr;
try {
try {
// Oracle 18.1 introduced BANNER_FULL as the new column rather than BANNER
// This column uses a different format than the legacy BANNER.
versionStr = queryAndMap("SELECT BANNER_FULL FROM V$VERSION WHERE BANNER_FULL LIKE 'Oracle Database%'", (rs) -> {
if (rs.next()) {
return rs.getString(1);
}
return null;
});
}
catch (SQLException e) {
// exception ignored
if (e.getMessage().contains("ORA-00904: \"BANNER_FULL\": invalid identifier")) {
LOGGER.debug("BANNER_FULL column not in V$VERSION, using BANNER column as fallback");
versionStr = null;
}
else {
throw e;
}
}
// For databases prior to 18.1, a SQLException will be thrown due to BANNER_FULL not being a column and
// this will cause versionStr to remain null, use fallback column BANNER for versions prior to 18.1.
if (versionStr == null) {
versionStr = queryAndMap("SELECT BANNER FROM V$VERSION WHERE BANNER LIKE 'Oracle Database%'", (rs) -> {
if (rs.next()) {
return rs.getString(1);
}
return null;
});
}
}
catch (SQLException e) {
throw new RuntimeException("Failed to resolve Oracle database version", e);
}
if (versionStr == null) {
throw new RuntimeException("Failed to resolve Oracle database version");
}
return OracleDatabaseVersion.parse(versionStr);
}
@Override
public Set<TableId> readTableNames(String databaseCatalog, String schemaNamePattern, String tableNamePattern,
String[] tableTypes)

View File

@ -0,0 +1,91 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Represents the Oracle database version.
*
* @author Chris Cranford
*/
public class OracleDatabaseVersion {
private final static Pattern VERSION_PATTERN = Pattern
.compile("(?:.*)(?:Release )([0-9]+)\\.([0-9]+)\\.([0-9]+)\\.([0-9]+)\\.([0-9]+)(?:.*)");
private final static Pattern VERSION_18_1_PATTERN = Pattern
.compile("(?:.*)(?:\\- Production(?:\\r\\n|\\r|\\n)(?:Version ))([0-9]+)\\.([0-9]+)\\.([0-9]+)\\.([0-9]+)\\.([0-9]+)");
private final int major;
private final int maintenance;
private final int appServer;
private final int component;
private final int platform;
private final String banner;
private OracleDatabaseVersion(int major, int maintenance, int appServer, int component, int platform, String banner) {
this.major = major;
this.maintenance = maintenance;
this.appServer = appServer;
this.component = component;
this.platform = platform;
this.banner = banner;
}
public int getMajor() {
return major;
}
public int getMaintenance() {
return maintenance;
}
public int getAppServer() {
return appServer;
}
public int getComponent() {
return component;
}
public int getPlatform() {
return platform;
}
public String getBanner() {
return banner;
}
@Override
public String toString() {
return major + "." + maintenance + "." + appServer + "." + component + "." + platform;
}
/**
* Parse the Oracle database version banner.
*
* @param banner the banner text
* @return the parsed OracleDatabaseVersion.
* @throws RuntimeException if the version banner string cannot be parsed
*/
public static OracleDatabaseVersion parse(String banner) {
Matcher matcher = VERSION_18_1_PATTERN.matcher(banner);
if (!matcher.matches()) {
matcher = VERSION_PATTERN.matcher(banner);
if (!matcher.matches()) {
throw new RuntimeException("Failed to resolve Oracle database version: '" + banner + "'");
}
}
int major = Integer.parseInt(matcher.group(1));
int maintenance = Integer.parseInt(matcher.group(2));
int app = Integer.parseInt(matcher.group(3));
int component = Integer.parseInt(matcher.group(4));
int platform = Integer.parseInt(matcher.group(5));
return new OracleDatabaseVersion(major, maintenance, app, component, platform, banner);
}
}

View File

@ -130,7 +130,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext ctx) throws Exc
private long getCurrentScn(SnapshotContext ctx) throws SQLException {
if (connectorConfig.getAdapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
return LogMinerHelper.getCurrentScn(jdbcConnection.connection());
return LogMinerHelper.getCurrentScn(jdbcConnection);
}
try (Statement statement = jdbcConnection.connection().createStatement();

View File

@ -7,7 +7,6 @@
import java.math.BigInteger;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -33,6 +32,7 @@
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDatabaseVersion;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
@ -44,12 +44,12 @@
*/
public class LogMinerHelper {
private final static String UNKNOWN = "unknown";
private static final String UNKNOWN = "unknown";
private static final String TOTAL = "TOTAL";
private final static Logger LOGGER = LoggerFactory.getLogger(LogMinerHelper.class);
public final static String MAX_SCN_S = "18446744073709551615";
public final static BigInteger MAX_SCN_BI = new BigInteger(MAX_SCN_S);
private static final String MAX_SCN_11_2 = "281474976710655";
private static final String MAX_SCN_12_2 = "18446744073709551615";
private static final String MAX_SCN_19_6 = "9295429630892703743";
private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerHelper.class);
public enum DATATYPE {
LONG,
@ -91,7 +91,7 @@ static void instantiateFlushConnections(JdbcConfiguration config, Set<String> ho
* @param connection connection to the database as LogMiner user (connection to the container)
* @throws SQLException any exception
*/
static void buildDataDictionary(Connection connection) throws SQLException {
static void buildDataDictionary(OracleConnection connection) throws SQLException {
LOGGER.trace("Building data dictionary");
executeCallableStatement(connection, SqlUtils.BUILD_DICTIONARY);
}
@ -103,8 +103,8 @@ static void buildDataDictionary(Connection connection) throws SQLException {
* @return current SCN
* @throws SQLException if anything unexpected happens
*/
public static long getCurrentScn(Connection connection) throws SQLException {
try (Statement statement = connection.createStatement();
public static long getCurrentScn(OracleConnection connection) throws SQLException {
try (Statement statement = connection.connection(false).createStatement();
ResultSet rs = statement.executeQuery(SqlUtils.currentScnQuery())) {
if (!rs.next()) {
@ -115,7 +115,7 @@ public static long getCurrentScn(Connection connection) throws SQLException {
}
}
static void createFlushTable(Connection connection) throws SQLException {
static void createFlushTable(OracleConnection connection) throws SQLException {
String tableExists = (String) getSingleResult(connection, SqlUtils.tableExistsQuery(SqlUtils.LOGMNR_FLUSH_TABLE), DATATYPE.STRING);
if (tableExists == null) {
executeCallableStatement(connection, SqlUtils.CREATE_FLUSH_TABLE);
@ -124,13 +124,11 @@ static void createFlushTable(Connection connection) throws SQLException {
String recordExists = (String) getSingleResult(connection, SqlUtils.FLUSH_TABLE_NOT_EMPTY, DATATYPE.STRING);
if (recordExists == null) {
executeCallableStatement(connection, SqlUtils.INSERT_FLUSH_TABLE);
if (!connection.getAutoCommit()) {
connection.commit();
}
connection.commit();
}
}
static void createLogMiningHistoryObjects(Connection connection, String historyTableName) throws SQLException {
static void createLogMiningHistoryObjects(OracleConnection connection, String historyTableName) throws SQLException {
String tableExists = (String) getSingleResult(connection, SqlUtils.tableExistsQuery(SqlUtils.LOGMNR_HISTORY_TEMP_TABLE), DATATYPE.STRING);
if (tableExists == null) {
@ -146,7 +144,7 @@ static void createLogMiningHistoryObjects(Connection connection, String historyT
}
}
static void deleteOutdatedHistory(Connection connection, long retention) throws SQLException {
static void deleteOutdatedHistory(OracleConnection connection, long retention) throws SQLException {
Set<String> tableNames = getMap(connection, SqlUtils.getHistoryTableNamesQuery(), "-1").keySet();
for (String tableName : tableNames) {
long hoursAgo = SqlUtils.parseRetentionFromName(tableName);
@ -169,7 +167,7 @@ static void deleteOutdatedHistory(Connection connection, long retention) throws
* @return next SCN to mine up to
* @throws SQLException if anything unexpected happens
*/
static long getEndScn(Connection connection, long startScn, LogMinerMetrics metrics, int defaultBatchSize) throws SQLException {
static long getEndScn(OracleConnection connection, long startScn, LogMinerMetrics metrics, int defaultBatchSize) throws SQLException {
long currentScn = getCurrentScn(connection);
metrics.setCurrentScn(currentScn);
@ -207,7 +205,7 @@ static long getEndScn(Connection connection, long startScn, LogMinerMetrics metr
* @param racHosts set of RAC host
* @throws SQLException exception
*/
static void flushLogWriter(Connection connection, JdbcConfiguration config,
static void flushLogWriter(OracleConnection connection, JdbcConfiguration config,
boolean isRac, Set<String> racHosts)
throws SQLException {
long currentScn = getCurrentScn(connection);
@ -217,9 +215,7 @@ static void flushLogWriter(Connection connection, JdbcConfiguration config,
else {
LOGGER.trace("Updating {} with SCN {}", SqlUtils.LOGMNR_FLUSH_TABLE, currentScn);
executeCallableStatement(connection, SqlUtils.UPDATE_FLUSH_TABLE + currentScn);
if (!connection.getAutoCommit()) {
connection.commit();
}
connection.commit();
}
}
@ -228,7 +224,7 @@ static void flushLogWriter(Connection connection, JdbcConfiguration config,
* @param connection connection
* @return the time difference as a {@link Duration}
*/
static Duration getTimeDifference(Connection connection) throws SQLException {
static Duration getTimeDifference(OracleConnection connection) throws SQLException {
Timestamp dbCurrentMillis = (Timestamp) getSingleResult(connection, SqlUtils.CURRENT_TIMESTAMP, DATATYPE.TIMESTAMP);
if (dbCurrentMillis == null) {
return Duration.ZERO;
@ -258,7 +254,7 @@ static void startLogMining(OracleConnection connection, Long startScn, Long endS
LOGGER.trace("Starting log mining startScn={}, endScn={}, strategy={}, continuous={}", startScn, endScn, strategy, isContinuousMining);
String statement = SqlUtils.startLogMinerStatement(startScn, endScn, strategy, isContinuousMining);
try {
executeCallableStatement(connection.connection(), statement);
executeCallableStatement(connection, statement);
}
catch (SQLException e) {
// Capture database state before throwing exception
@ -276,9 +272,9 @@ static void startLogMining(OracleConnection connection, Long startScn, Long endS
* @return full redo log file name(s), including path
* @throws SQLException if anything unexpected happens
*/
static Set<String> getCurrentRedoLogFiles(Connection connection, LogMinerMetrics metrics) throws SQLException {
static Set<String> getCurrentRedoLogFiles(OracleConnection connection, LogMinerMetrics metrics) throws SQLException {
Set<String> fileNames = new HashSet<>();
try (PreparedStatement st = connection.prepareStatement(SqlUtils.currentRedoNameQuery()); ResultSet result = st.executeQuery()) {
try (PreparedStatement st = connection.connection(false).prepareStatement(SqlUtils.currentRedoNameQuery()); ResultSet result = st.executeQuery()) {
while (result.next()) {
fileNames.add(result.getString(1));
}
@ -297,15 +293,16 @@ static Set<String> getCurrentRedoLogFiles(Connection connection, LogMinerMetrics
* @return oldest SCN from online redo log
* @throws SQLException if anything unexpected happens
*/
static long getFirstOnlineLogScn(Connection connection, Duration archiveLogRetention) throws SQLException {
static long getFirstOnlineLogScn(OracleConnection connection, Duration archiveLogRetention) throws SQLException {
LOGGER.trace("Getting first scn of all online logs");
Statement s = connection.createStatement();
ResultSet res = s.executeQuery(SqlUtils.oldestFirstChangeQuery(archiveLogRetention));
res.next();
long firstScnOfOnlineLog = res.getLong(1);
LOGGER.trace("First SCN in online logs is {}", firstScnOfOnlineLog);
res.close();
return firstScnOfOnlineLog;
try (Statement s = connection.connection(false).createStatement()) {
try (ResultSet rs = s.executeQuery(SqlUtils.oldestFirstChangeQuery(archiveLogRetention))) {
rs.next();
long firstScnOfOnlineLog = rs.getLong(1);
LOGGER.trace("First SCN in online logs is {}", firstScnOfOnlineLog);
return firstScnOfOnlineLog;
}
}
}
/**
@ -324,7 +321,7 @@ static void setNlsSessionParameters(JdbcConnection connection) throws SQLExcepti
* @param fileNames name of current REDO LOG files
* @param metrics current metrics
*/
private static void updateRedoLogMetrics(Connection connection, LogMinerMetrics metrics, Set<String> fileNames) {
private static void updateRedoLogMetrics(OracleConnection connection, LogMinerMetrics metrics, Set<String> fileNames) {
try {
// update metrics
Map<String, String> logStatuses = getRedoLogStatus(connection);
@ -345,7 +342,7 @@ private static void updateRedoLogMetrics(Connection connection, LogMinerMetrics
* @return REDO LOG statuses Map, where key is the REDO name and value is the status
* @throws SQLException if anything unexpected happens
*/
private static Map<String, String> getRedoLogStatus(Connection connection) throws SQLException {
private static Map<String, String> getRedoLogStatus(OracleConnection connection) throws SQLException {
return getMap(connection, SqlUtils.redoLogStatusQuery(), UNKNOWN);
}
@ -354,7 +351,7 @@ private static Map<String, String> getRedoLogStatus(Connection connection) throw
* @param connection privileged connection
* @return counter
*/
private static int getSwitchCount(Connection connection) {
private static int getSwitchCount(OracleConnection connection) {
try {
Map<String, String> total = getMap(connection, SqlUtils.switchHistoryQuery(), UNKNOWN);
if (total != null && total.get(TOTAL) != null) {
@ -390,7 +387,7 @@ private static void flushRacLogWriters(long currentScn, JdbcConfiguration config
continue;
}
LOGGER.trace("Flushing Log Writer buffer of node {}", host);
executeCallableStatement(conn.connection(), SqlUtils.UPDATE_FLUSH_TABLE + currentScn);
executeCallableStatement(conn, SqlUtils.UPDATE_FLUSH_TABLE + currentScn);
conn.commit();
}
catch (Exception e) {
@ -437,17 +434,17 @@ static void checkSupplementalLogging(OracleConnection connection, String pdbName
}
// Check if ALL supplemental logging is enabled at the database
Map<String, String> globalAll = getMap(connection.connection(false), SqlUtils.databaseSupplementalLoggingAllCheckQuery(), UNKNOWN);
Map<String, String> globalAll = getMap(connection, SqlUtils.databaseSupplementalLoggingAllCheckQuery(), UNKNOWN);
if ("NO".equalsIgnoreCase(globalAll.get("KEY"))) {
// Check if MIN supplemental logging is enabled at the database
Map<String, String> globalMin = getMap(connection.connection(false), SqlUtils.databaseSupplementalLoggingMinCheckQuery(), UNKNOWN);
Map<String, String> globalMin = getMap(connection, SqlUtils.databaseSupplementalLoggingMinCheckQuery(), UNKNOWN);
if ("NO".equalsIgnoreCase(globalMin.get("KEY"))) {
throw new DebeziumException("Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA");
}
// If ALL supplemental logging is not enabled, then each monitored table should be set to ALL COLUMNS
for (TableId tableId : schema.getTables().tableIds()) {
Map<String, String> tableAll = getMap(connection.connection(false), SqlUtils.tableSupplementalLoggingCheckQuery(tableId), UNKNOWN);
Map<String, String> tableAll = getMap(connection, SqlUtils.tableSupplementalLoggingCheckQuery(tableId), UNKNOWN);
if (!"ALL COLUMN LOGGING".equalsIgnoreCase(tableAll.get("KEY"))) {
throw new DebeziumException("Supplemental logging not configured for table " + tableId + ". " +
"Use command: ALTER TABLE " + tableId.schema() + "." + tableId.table() + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
@ -468,7 +465,7 @@ static void checkSupplementalLogging(OracleConnection connection, String pdbName
*
* @param connection container level database connection
*/
public static void endMining(Connection connection) {
public static void endMining(OracleConnection connection) {
String stopMining = SqlUtils.END_LOGMNR;
try {
executeCallableStatement(connection, stopMining);
@ -491,7 +488,7 @@ public static void endMining(Connection connection) {
* @throws SQLException if anything unexpected happens
*/
// todo: check RAC resiliency
public static void setRedoLogFilesForMining(Connection connection, Long lastProcessedScn, Duration archiveLogRetention) throws SQLException {
public static void setRedoLogFilesForMining(OracleConnection connection, Long lastProcessedScn, Duration archiveLogRetention) throws SQLException {
removeLogFilesFromMining(connection);
@ -526,7 +523,7 @@ public static void setRedoLogFilesForMining(Connection connection, Long lastProc
* @param transactionRetention duration to tolerate long running transactions
* @return optional SCN as a watermark for abandonment
*/
public static Optional<Long> getLastScnToAbandon(Connection connection, Long offsetScn, Duration transactionRetention) {
public static Optional<Long> getLastScnToAbandon(OracleConnection connection, Long offsetScn, Duration transactionRetention) {
try {
String query = SqlUtils.diffInDaysQuery(offsetScn);
Float diffInDays = (Float) getSingleResult(connection, query, DATATYPE.FLOAT);
@ -556,35 +553,58 @@ static void logError(TransactionalBufferMetrics metrics, String format, Object..
* @param connection connection
* @return size
*/
private static int getRedoLogGroupSize(Connection connection) throws SQLException {
return getMap(connection, SqlUtils.allOnlineLogsQuery(), MAX_SCN_S).size();
private static int getRedoLogGroupSize(OracleConnection connection) throws SQLException {
return getMap(connection, SqlUtils.allOnlineLogsQuery(), getDatabaseMaxScnValue(connection)).size();
}
/**
* Get the database maximum SCN value
*
* @param connection the oracle database connection
* @return the maximum scn value as a string value
*/
public static String getDatabaseMaxScnValue(OracleConnection connection) {
OracleDatabaseVersion version = connection.getOracleVersion();
if ((version.getMajor() == 19 && version.getMaintenance() >= 6) || (version.getMajor() > 19)) {
return MAX_SCN_19_6;
}
else if ((version.getMajor() == 12 && version.getMaintenance() >= 2) || (version.getMajor() > 12)) {
return MAX_SCN_12_2;
}
else if ((version.getMajor() == 11 && version.getMaintenance() >= 2) || (version.getMajor() == 12 && version.getMaintenance() < 2)) {
return MAX_SCN_11_2;
}
throw new RuntimeException("Max SCN cannot be resolved for database version " + version);
}
/**
* This method returns all online log files, starting from one which contains offset SCN and ending with one containing largest SCN
* 18446744073709551615 on Ora 19c is the max value of the nextScn in the current redo todo replace all Long with BigInteger for SCN
*/
public static Map<String, BigInteger> getOnlineLogFilesForOffsetScn(Connection connection, Long offsetScn) throws SQLException {
public static Map<String, BigInteger> getOnlineLogFilesForOffsetScn(OracleConnection connection, Long offsetScn) throws SQLException {
// TODO: Make offset a BigInteger and refactor upstream
BigInteger offsetScnBi = BigInteger.valueOf(offsetScn);
LOGGER.trace("Getting online redo logs for offset scn {}", offsetScnBi);
Map<String, ScnRange> redoLogFiles = new LinkedHashMap<>();
try (PreparedStatement s = connection.prepareStatement(SqlUtils.allOnlineLogsQuery())) {
String maxScnStr = getDatabaseMaxScnValue(connection);
final BigInteger maxScn = new BigInteger(maxScnStr);
try (PreparedStatement s = connection.connection(false).prepareStatement(SqlUtils.allOnlineLogsQuery())) {
try (ResultSet rs = s.executeQuery()) {
while (rs.next()) {
ScnRange range = new ScnRange(rs.getString(4), defaultIfNull(rs.getString(2), MAX_SCN_S));
ScnRange range = new ScnRange(rs.getString(4), defaultIfNull(rs.getString(2), maxScnStr));
redoLogFiles.put(rs.getString(1), range);
}
}
}
return redoLogFiles.entrySet().stream()
.filter(entry -> filterRedoLogEntry(entry, offsetScnBi)).collect(Collectors
.toMap(Map.Entry::getKey, entry -> resolveNextChangeFromScnRange(entry.getValue().getNextChange())));
.filter(entry -> filterRedoLogEntry(entry, offsetScnBi, maxScn)).collect(Collectors
.toMap(Map.Entry::getKey, entry -> resolveNextChangeFromScnRange(entry.getValue().getNextChange(), maxScn)));
}
private static boolean filterRedoLogEntry(Map.Entry<String, ScnRange> entry, BigInteger offsetScn) {
private static boolean filterRedoLogEntry(Map.Entry<String, ScnRange> entry, BigInteger offsetScn, BigInteger maxScn) {
final BigInteger nextChangeNumber = new BigInteger(entry.getValue().getNextChange());
if (nextChangeNumber.compareTo(offsetScn) >= 0 || nextChangeNumber.equals(MAX_SCN_BI)) {
if (nextChangeNumber.compareTo(offsetScn) >= 0 || nextChangeNumber.equals(maxScn)) {
LOGGER.trace("Online redo log {} with SCN range {} to {} to be added.", entry.getKey(), entry.getValue().getFirstChange(), entry.getValue().getNextChange());
return true;
}
@ -592,9 +612,9 @@ private static boolean filterRedoLogEntry(Map.Entry<String, ScnRange> entry, Big
return false;
}
private static BigInteger resolveNextChangeFromScnRange(String nextChangeValue) {
private static BigInteger resolveNextChangeFromScnRange(String nextChangeValue, BigInteger maxScn) {
final BigInteger value = new BigInteger(nextChangeValue);
return value.equals(MAX_SCN_BI) ? MAX_SCN_BI : value;
return value.equals(maxScn) ? maxScn : value;
}
/**
@ -676,19 +696,21 @@ private static void logQueryResults(OracleConnection connection, String query) t
* @return Map of archived files
* @throws SQLException if something happens
*/
public static Map<String, BigInteger> getArchivedLogFilesForOffsetScn(Connection connection, Long offsetScn, Duration archiveLogRetention) throws SQLException {
public static Map<String, BigInteger> getArchivedLogFilesForOffsetScn(OracleConnection connection, Long offsetScn, Duration archiveLogRetention) throws SQLException {
final Map<String, String> redoLogFiles = new LinkedHashMap<>();
try (PreparedStatement s = connection.prepareStatement(SqlUtils.archiveLogsQuery(offsetScn, archiveLogRetention))) {
final String maxScnStr = getDatabaseMaxScnValue(connection);
final BigInteger maxScn = new BigInteger(maxScnStr);
try (PreparedStatement s = connection.connection(false).prepareStatement(SqlUtils.archiveLogsQuery(offsetScn, archiveLogRetention))) {
try (ResultSet rs = s.executeQuery()) {
while (rs.next()) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Archive log {} with SCN range {} to {} to be added.", rs.getString(1), rs.getString(3), rs.getString(2));
}
redoLogFiles.put(rs.getString(1), defaultIfNull(rs.getString(2), MAX_SCN_S));
redoLogFiles.put(rs.getString(1), defaultIfNull(rs.getString(2), maxScnStr));
}
}
}
return redoLogFiles.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> resolveNextChangeFromScnRange(e.getValue())));
return redoLogFiles.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> resolveNextChangeFromScnRange(e.getValue(), maxScn)));
}
/**
@ -696,8 +718,8 @@ public static Map<String, BigInteger> getArchivedLogFilesForOffsetScn(Connection
* @param conn connection
* @throws SQLException something happened
*/
public static void removeLogFilesFromMining(Connection conn) throws SQLException {
try (PreparedStatement ps = conn.prepareStatement(SqlUtils.FILES_FOR_MINING);
public static void removeLogFilesFromMining(OracleConnection conn) throws SQLException {
try (PreparedStatement ps = conn.connection(false).prepareStatement(SqlUtils.FILES_FOR_MINING);
ResultSet result = ps.executeQuery()) {
while (result.next()) {
String fileName = result.getString(1);
@ -707,17 +729,17 @@ public static void removeLogFilesFromMining(Connection conn) throws SQLException
}
}
private static void executeCallableStatement(Connection connection, String statement) throws SQLException {
private static void executeCallableStatement(OracleConnection connection, String statement) throws SQLException {
Objects.requireNonNull(statement);
try (CallableStatement s = connection.prepareCall(statement)) {
try (CallableStatement s = connection.connection(false).prepareCall(statement)) {
s.execute();
}
}
public static Map<String, String> getMap(Connection connection, String query, String nullReplacement) throws SQLException {
public static Map<String, String> getMap(OracleConnection connection, String query, String nullReplacement) throws SQLException {
Map<String, String> result = new LinkedHashMap<>();
try (
PreparedStatement statement = connection.prepareStatement(query);
PreparedStatement statement = connection.connection(false).prepareStatement(query);
ResultSet rs = statement.executeQuery()) {
while (rs.next()) {
String value = rs.getString(2);
@ -728,8 +750,8 @@ public static Map<String, String> getMap(Connection connection, String query, St
}
}
public static Object getSingleResult(Connection connection, String query, DATATYPE type) throws SQLException {
try (PreparedStatement statement = connection.prepareStatement(query);
public static Object getSingleResult(OracleConnection connection, String query, DATATYPE type) throws SQLException {
try (PreparedStatement statement = connection.connection(false).prepareStatement(query);
ResultSet rs = statement.executeQuery()) {
if (rs.next()) {
switch (type) {

View File

@ -21,7 +21,6 @@
import static io.debezium.connector.oracle.logminer.LogMinerHelper.setRedoLogFilesForMining;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.startLogMining;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -111,7 +110,8 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
/**
* This is the loop to get changes from LogMiner
*
* @param context change event source context
* @param context
* change event source context
*/
@Override
public void execute(ChangeEventSourceContext context) {
@ -120,85 +120,82 @@ public void execute(ChangeEventSourceContext context) {
// Perform registration
registerLogMinerMetrics();
try (Connection connection = jdbcConnection.connection(false)) {
long databaseTimeMs = getTimeDifference(connection).toMillis();
long databaseTimeMs = getTimeDifference(jdbcConnection).toMillis();
LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs);
transactionalBuffer.setDatabaseTimeDifference(databaseTimeMs);
LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs);
transactionalBuffer.setDatabaseTimeDifference(databaseTimeMs);
startScn = offsetContext.getScn();
createFlushTable(jdbcConnection);
startScn = offsetContext.getScn();
createFlushTable(connection);
if (!isContinuousMining && startScn < getFirstOnlineLogScn(jdbcConnection, archiveLogRetention)) {
throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
}
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection, archiveLogRetention)) {
throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
}
setNlsSessionParameters(jdbcConnection);
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
setNlsSessionParameters(jdbcConnection);
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
initializeRedoLogsForMining(jdbcConnection, false, archiveLogRetention);
initializeRedoLogsForMining(connection, false, archiveLogRetention);
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
try {
// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
try {
// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());
final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics,
transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, catalogName, clock, historyRecorder);
final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics,
transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, catalogName, clock, historyRecorder);
try (PreparedStatement miningView = jdbcConnection.connection()
.prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
Set<String> currentRedoLogFiles = getCurrentRedoLogFiles(jdbcConnection, logMinerMetrics);
try (PreparedStatement miningView = connection
.prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
Set<String> currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
Stopwatch stopwatch = Stopwatch.reusable();
while (context.isRunning()) {
endScn = getEndScn(jdbcConnection, startScn, logMinerMetrics, connectorConfig.getLogMiningBatchSizeDefault());
flushLogWriter(jdbcConnection, jdbcConfiguration, isRac, racHosts);
Stopwatch stopwatch = Stopwatch.reusable();
while (context.isRunning()) {
endScn = getEndScn(connection, startScn, logMinerMetrics, connectorConfig.getLogMiningBatchSizeDefault());
flushLogWriter(connection, jdbcConfiguration, isRac, racHosts);
pauseBetweenMiningSessions();
pauseBetweenMiningSessions();
Set<String> possibleNewCurrentLogFile = getCurrentRedoLogFiles(jdbcConnection, logMinerMetrics);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("Redo log switch detected, from {} to {}", currentRedoLogFiles, possibleNewCurrentLogFile);
Set<String> possibleNewCurrentLogFile = getCurrentRedoLogFiles(connection, logMinerMetrics);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("Redo log switch detected, from {} to {}", currentRedoLogFiles, possibleNewCurrentLogFile);
// This is the way to mitigate PGA leaks.
// With one mining session, it grows and maybe there is another way to flush PGA.
// At this point we use a new mining session
LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining);
endMining(jdbcConnection);
// This is the way to mitigate PGA leaks.
// With one mining session, it grows and maybe there is another way to flush PGA.
// At this point we use a new mining session
LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining);
endMining(connection);
initializeRedoLogsForMining(jdbcConnection, true, archiveLogRetention);
initializeRedoLogsForMining(connection, true, archiveLogRetention);
abandonOldTransactionsIfExist(jdbcConnection, transactionalBuffer);
currentRedoLogFiles = getCurrentRedoLogFiles(jdbcConnection, logMinerMetrics);
}
abandonOldTransactionsIfExist(connection, transactionalBuffer);
currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
}
startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining);
startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining);
stopwatch.start();
miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE);
miningView.setLong(1, startScn);
miningView.setLong(2, endScn);
try (ResultSet rs = miningView.executeQuery()) {
Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
logMinerMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
processor.processResult(rs);
stopwatch.start();
miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE);
miningView.setLong(1, startScn);
miningView.setLong(2, endScn);
try (ResultSet rs = miningView.executeQuery()) {
Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
logMinerMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
processor.processResult(rs);
startScn = endScn;
startScn = endScn;
if (transactionalBuffer.isEmpty()) {
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
offsetContext.setScn(startScn);
}
if (transactionalBuffer.isEmpty()) {
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
offsetContext.setScn(startScn);
}
}
}
}
finally {
historyRecorder.close();
}
}
finally {
historyRecorder.close();
}
}
catch (Throwable t) {
@ -208,7 +205,7 @@ public void execute(ChangeEventSourceContext context) {
finally {
LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
LOGGER.info("Transactional buffer metrics dump: {}", transactionalBuffer.getMetrics().toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.getMetrics().toString());
LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString());
// Perform unregistration
@ -231,7 +228,7 @@ private void unregisterLogMinerMetrics() {
}
}
private void abandonOldTransactionsIfExist(Connection connection, TransactionalBuffer transactionalBuffer) {
private void abandonOldTransactionsIfExist(OracleConnection connection, TransactionalBuffer transactionalBuffer) {
Optional<Long> lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetContext.getScn(), connectorConfig.getLogMiningTransactionRetention());
lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext);
@ -240,7 +237,7 @@ private void abandonOldTransactionsIfExist(Connection connection, TransactionalB
});
}
private void initializeRedoLogsForMining(Connection connection, boolean postEndMiningSession, Duration archiveLogRetention) throws SQLException {
private void initializeRedoLogsForMining(OracleConnection connection, boolean postEndMiningSession, Duration archiveLogRetention) throws SQLException {
if (!postEndMiningSession) {
if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {
buildDataDictionary(connection);

View File

@ -9,7 +9,6 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -49,14 +48,14 @@ public class LogMinerHelperIT extends AbstractConnectorTest {
private static OracleConnection connection;
private static OracleConnection lmConnection;
private static Connection conn;
private static OracleConnection conn;
@BeforeClass
public static void beforeSuperClass() throws SQLException {
connection = TestHelper.testConnection();
lmConnection = TestHelper.testConnection();
conn = lmConnection.connection(false);
conn = lmConnection;
lmConnection.resetSessionToCdb();
LogMinerHelper.removeLogFilesFromMining(conn);
@ -64,9 +63,6 @@ public static void beforeSuperClass() throws SQLException {
@AfterClass
public static void closeConnection() throws SQLException {
if (conn != null && !conn.isClosed()) {
conn.close();
}
if (lmConnection != null && lmConnection.isConnected()) {
lmConnection.close();
}
@ -156,7 +152,7 @@ public void shouldCalculateAbandonTransactions() throws Exception {
long twoHoursAgoScn;
String scnQuery = "with minus_one as (select (systimestamp - INTERVAL '2' HOUR) as diff from dual) " +
"select timestamp_to_scn(diff) from minus_one";
try (PreparedStatement ps = conn.prepareStatement(scnQuery);
try (PreparedStatement ps = conn.connection(false).prepareStatement(scnQuery);
ResultSet rs = ps.executeQuery()) {
rs.next();
twoHoursAgoScn = rs.getBigDecimal(1).longValue();
@ -203,9 +199,9 @@ private Long getOldestArchivedScn(List<BigDecimal> oneDayArchivedNextScn) throws
return oldestArchivedScn;
}
private static int getNumberOfAddedLogFiles(Connection conn) throws SQLException {
private static int getNumberOfAddedLogFiles(OracleConnection conn) throws SQLException {
int counter = 0;
try (PreparedStatement ps = conn.prepareStatement("select * from V$LOGMNR_LOGS");
try (PreparedStatement ps = conn.connection(false).prepareStatement("select * from V$LOGMNR_LOGS");
ResultSet result = ps.executeQuery()) {
while (result.next()) {
counter++;
@ -214,10 +210,10 @@ private static int getNumberOfAddedLogFiles(Connection conn) throws SQLException
return counter;
}
private List<BigDecimal> getOneDayArchivedLogNextScn(Connection conn) throws SQLException {
private List<BigDecimal> getOneDayArchivedLogNextScn(OracleConnection conn) throws SQLException {
List<BigDecimal> allArchivedNextScn = new ArrayList<>();
try (
PreparedStatement st = conn.prepareStatement("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE FROM V$ARCHIVED_LOG " +
PreparedStatement st = conn.connection(false).prepareStatement("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE FROM V$ARCHIVED_LOG " +
" WHERE NAME IS NOT NULL AND FIRST_TIME >= SYSDATE - 1 AND ARCHIVED = 'YES' " +
" AND STATUS = 'A' ORDER BY 2");
ResultSet rs = st.executeQuery()) {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.oracle;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.anyInt;
@ -23,12 +24,15 @@
import org.mockito.Mockito;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.doc.FixFor;
public class LogMinerHelperTest {
private Connection connection = Mockito.mock(Connection.class);
private OracleConnection connection = Mockito.mock(OracleConnection.class);
private int current;
private String[][] mockRows;
private String maxScnStr;
private BigInteger maxScn;
@Before
public void beforeEach() throws Exception {
@ -37,8 +41,16 @@ public void beforeEach() throws Exception {
mockRows = new String[][]{};
ResultSet rs = Mockito.mock(ResultSet.class);
Connection conn = Mockito.mock(Connection.class);
Mockito.when(connection.connection()).thenReturn(conn);
Mockito.when(connection.connection(false)).thenReturn(conn);
Mockito.when(connection.getOracleVersion())
.thenReturn(OracleDatabaseVersion.parse("Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 - 64bit Production"));
maxScnStr = LogMinerHelper.getDatabaseMaxScnValue(connection);
maxScn = new BigInteger(maxScnStr);
PreparedStatement pstmt = Mockito.mock(PreparedStatement.class);
Mockito.when(connection.prepareStatement(anyString())).thenReturn(pstmt);
Mockito.when(conn.prepareStatement(anyString())).thenReturn(pstmt);
Mockito.when(pstmt.executeQuery()).thenReturn(rs);
Mockito.when(rs.next()).thenAnswer(it -> ++current > mockRows.length ? false : true);
Mockito.when(rs.getString(anyInt())).thenAnswer(it -> {
@ -85,7 +97,7 @@ public void nullsHandledAsMaxScn() throws Exception {
Map<String, BigInteger> onlineLogs = LogMinerHelper.getOnlineLogFilesForOffsetScn(connection, 600L);
assertEquals(onlineLogs.size(), 3);
assertEquals(onlineLogs.get("logfile3"), LogMinerHelper.MAX_SCN_BI);
assertEquals(onlineLogs.get("logfile3"), maxScn);
}
@Test
@ -94,12 +106,12 @@ public void canHandleMaxScn() throws Exception {
mockRows = new String[][]{
new String[]{ "logfile1", "103400", "11", "103700" },
new String[]{ "logfile2", "103700", "12", "104000" },
new String[]{ "logfile3", LogMinerHelper.MAX_SCN_S, "13", "104300" },
new String[]{ "logfile3", maxScnStr, "13", "104300" },
};
Map<String, BigInteger> onlineLogs = LogMinerHelper.getOnlineLogFilesForOffsetScn(connection, 600L);
assertEquals(onlineLogs.size(), 3);
assertEquals(onlineLogs.get("logfile3"), LogMinerHelper.MAX_SCN_BI);
assertEquals(onlineLogs.get("logfile3"), maxScn);
}
@Test
@ -160,7 +172,7 @@ public void archiveNullsHandledAsMaxScn() throws Exception {
Map<String, BigInteger> onlineLogs = LogMinerHelper.getArchivedLogFilesForOffsetScn(connection, 500L, Duration.ofDays(60));
assertEquals(onlineLogs.size(), 3);
assertEquals(onlineLogs.get("logfile3"), LogMinerHelper.MAX_SCN_BI);
assertEquals(onlineLogs.get("logfile3"), maxScn);
}
@Test
@ -169,12 +181,12 @@ public void archiveCanHandleMaxScn() throws Exception {
mockRows = new String[][]{
new String[]{ "logfile1", "103400", "11" },
new String[]{ "logfile2", "103700", "12" },
new String[]{ "logfile3", LogMinerHelper.MAX_SCN_S, "13" },
new String[]{ "logfile3", maxScnStr, "13" },
};
Map<String, BigInteger> onlineLogs = LogMinerHelper.getArchivedLogFilesForOffsetScn(connection, 500L, Duration.ofDays(60));
assertEquals(onlineLogs.size(), 3);
assertEquals(onlineLogs.get("logfile3"), LogMinerHelper.MAX_SCN_BI);
assertEquals(onlineLogs.get("logfile3"), maxScn);
}
@Test
@ -193,4 +205,89 @@ public void archiveLogsWithLongerScnAreSupported() throws Exception {
assertEquals(onlineLogs.size(), 3);
assertEquals(onlineLogs.get("logfile3"), new BigInteger(scnLonger));
}
@Test
@FixFor("DBZ-3001")
public void testOracleMaxScn() throws Exception {
final OracleConnection connection = Mockito.mock(OracleConnection.class);
final ResultSet rs = Mockito.mock(ResultSet.class);
final Connection conn = Mockito.mock(Connection.class);
Mockito.when(connection.connection()).thenReturn(conn);
Mockito.when(connection.connection(false)).thenReturn(conn);
// Test Oracle 11
String banner = "Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("281474976710655");
// Test Oracle 12.1
banner = "Oracle Database 12c Enterprise Edition Release 12.1.0.0.0 - 64bit Production";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("281474976710655");
// Test Oracle 12.2
banner = "Oracle Database 12c Enterprise Edition Release 12.2.0.4.0 - 64bit Production";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 18.0
banner = "Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 - 64bit Production";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 18.1
banner = "Oracle Database 18c Enterprise Edition Release 18.1.0.0.0 - 64bit Production";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 18.2
banner = "Oracle Database 18c Enterprise Edition Release 18.0.0.0.0 - Production" + System.lineSeparator() + "Version 18.2.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 19.0
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.0.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 19.1
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.1.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 19.2
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.2.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 19.3
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.3.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 19.4
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.4.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 19.5
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.5.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("18446744073709551615");
// Test Oracle 19.6
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.6.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("9295429630892703743");
// Test Oracle 19.7
banner = "Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production" + System.lineSeparator() + "Version 19.7.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("9295429630892703743");
// Test Oracle 21
banner = "Oracle Database 21c Enterprise Edition Release 21.0.0.0.0 - Production" + System.lineSeparator() + "Version 21.0.0.0.0";
Mockito.when(connection.getOracleVersion()).thenReturn(OracleDatabaseVersion.parse(banner));
assertThat(LogMinerHelper.getDatabaseMaxScnValue(connection)).isEqualTo("9295429630892703743");
}
}

View File

@ -1372,6 +1372,14 @@ public void shouldReadTableUniqueIndicesWithCharactersThatRequireExplicitQuotes(
}
}
@Test
@FixFor("DBZ-3001")
public void shouldGetOracleDatabaseVersion() throws Exception {
OracleDatabaseVersion version = connection.getOracleVersion();
assertThat(version).isNotNull();
assertThat(version.getMajor()).isGreaterThan(0);
}
private String generateAlphaNumericStringColumn(int size) {
final String alphaNumericString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
final StringBuilder sb = new StringBuilder(size);