DBZ-7258 Simplify exception handling

This commit is contained in:
Jiri Pechanec 2024-03-22 11:20:48 +01:00
parent 1513227578
commit b63e5ce048
2 changed files with 10 additions and 27 deletions

View File

@ -5,11 +5,9 @@
*/
package io.debezium.storage.jdbc;
import java.io.UncheckedIOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.util.Objects;
@ -53,7 +51,7 @@ public RetriableConnection(String url, String user, String pwd, Duration waitRet
try {
createConnection();
}
catch (SQLRecoverableException e) {
catch (SQLException e) {
LOGGER.error("Unable to create connection. It will be re-attempted during its first use: " + e.getMessage(), e);
if (conn != null) {
try {
@ -87,10 +85,9 @@ public boolean isConnectionCreated() {
* @param name name of the operation being executed (for logging purposes)
* @param rollback if set to true, the rollback will be called in case of SQLException
* @throws SQLException sql connection related exception
* @throws UncheckedIOException exception that can be thrown by code snippet
*/
public synchronized void executeWithRetry(ConnectionConsumer consumer, String name, boolean rollback)
throws SQLException, UncheckedIOException {
throws SQLException {
executeWithRetry(null, consumer, name, rollback);
}
@ -100,15 +97,14 @@ public synchronized void executeWithRetry(ConnectionConsumer consumer, String na
* @param name name of the operation being executed (for logging purposes)
* @param rollback if set to true, the rollback will be called in case of SQLException
* @throws SQLException sql connection related exception
* @throws UncheckedIOException exception that can be thrown by code snippet
*/
public synchronized <T> T executeWithRetry(ConnectionFunction<T> func, String name, boolean rollback)
throws SQLException, UncheckedIOException {
throws SQLException {
return executeWithRetry(func, null, name, rollback);
}
private synchronized <T> T executeWithRetry(ConnectionFunction<T> func, ConnectionConsumer consumer, String name, boolean rollback)
throws SQLException, UncheckedIOException {
throws SQLException {
int attempt = 1;
while (true) {
if (conn == null) {
@ -138,7 +134,7 @@ private synchronized <T> T executeWithRetry(ConnectionFunction<T> func, Connecti
return null;
}
}
catch (SQLRecoverableException e) {
catch (SQLException e) {
LOGGER.warn("Attempt {} to call '{}' failed.", attempt, name, e);
if (rollback) {
LOGGER.warn("'{}': doing rollback.", name);
@ -158,19 +154,6 @@ private synchronized <T> T executeWithRetry(ConnectionFunction<T> func, Connecti
conn = null;
}
catch (SQLException e) {
LOGGER.warn("Call '{}' failed.", name, e);
if (rollback) {
LOGGER.warn("'{}': doing rollback.", name);
try {
conn.rollback();
}
catch (SQLException ex) {
// ignore
}
}
throw e;
}
}
}

View File

@ -6,7 +6,6 @@
package io.debezium.storage.jdbc.history;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ -23,6 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.common.annotation.Incubating;
@ -111,7 +111,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
line = writer.write(record.document());
}
catch (IOException e) {
throw new UncheckedIOException(e);
throw new DebeziumException(e);
}
Timestamp currentTs = new Timestamp(System.currentTimeMillis());
List<String> substrings = split(line, 65000);
@ -129,7 +129,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
conn.commit();
}, "store history record", true);
}
catch (UncheckedIOException | SQLException e) {
catch (SQLException e) {
throw new SchemaHistoryException("Failed to store record: " + record, e);
}
});
@ -174,7 +174,7 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
records.accept(new HistoryRecord(reader.read(historyData)));
}
catch (IOException e) {
throw new UncheckedIOException(e);
throw new DebeziumException(e);
}
}
}
@ -184,7 +184,7 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
LOG.error("Storage does not exist when recovering records");
}
}
catch (UncheckedIOException | SQLException e) {
catch (SQLException e) {
throw new SchemaHistoryException("Failed to recover records", e);
}
});