From 933ca55b7479610504edc5f63dd3b13ece83193f Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 4 Feb 2021 16:21:03 -0500 Subject: [PATCH] DBZ-3055 Use JdbcConnection#query instead --- .../connector/oracle/logminer/LogMinerHelper.java | 12 ++++++------ .../logminer/LogMinerStreamingChangeEventSource.java | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java index 8c7bee4b7..c3eaa62e3 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java @@ -252,13 +252,13 @@ static Duration getTimeDifference(Connection connection) throws SQLException { * @param isContinuousMining works < 19 version only * @throws SQLException if anything unexpected happens */ - static void startLogMining(Connection connection, Long startScn, Long endScn, + static void startLogMining(OracleConnection connection, Long startScn, Long endScn, OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining) throws SQLException { LOGGER.trace("Starting log mining startScn={}, endScn={}, strategy={}, continuous={}", startScn, endScn, strategy, isContinuousMining); String statement = SqlUtils.startLogMinerStatement(startScn, endScn, strategy, isContinuousMining); try { - executeCallableStatement(connection, statement); + executeCallableStatement(connection.connection(), statement); } catch (SQLException e) { // Capture database state before throwing exception @@ -603,7 +603,7 @@ private static BigInteger resolveNextChangeFromScnRange(String nextChangeValue) * * @param connection the database connection */ - private static void logDatabaseState(Connection connection) { + private static void logDatabaseState(OracleConnection connection) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Configured redo logs are:"); try { @@ -650,8 +650,8 @@ private static void logDatabaseState(Connection connection) { * @param query the query to execute * @throws SQLException thrown if an exception occurs performing a SQL operation */ - private static void logQueryResults(Connection connection, String query) throws SQLException { - try (PreparedStatement s = connection.prepareStatement(query); ResultSet rs = s.executeQuery()) { + private static void logQueryResults(OracleConnection connection, String query) throws SQLException { + connection.query(query, rs -> { int columns = rs.getMetaData().getColumnCount(); List columnNames = new ArrayList<>(); for (int index = 1; index <= columns; ++index) { @@ -665,7 +665,7 @@ private static void logQueryResults(Connection connection, String query) throws } LOGGER.debug("{}", columnValues); } - } + }); } /** diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index 9a7aa2751..ac0dd4d8a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -180,7 +180,7 @@ public void execute(ChangeEventSourceContext context) { currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics); } - startLogMining(connection, startScn, endScn, strategy, isContinuousMining); + startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining); stopwatch.start(); miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE);