DBZ-7258 Reconnect DB feature for JDBC Storage

This commit is contained in:
Jiri Kulhanek 2024-01-17 17:20:35 +01:00 committed by Jiri Pechanec
parent 8be05c9314
commit 1513227578
4 changed files with 365 additions and 108 deletions

View File

@ -5,6 +5,7 @@
*/ */
package io.debezium.storage.jdbc; package io.debezium.storage.jdbc;
import java.time.Duration;
import java.util.List; import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -39,9 +40,21 @@ public class JdbcCommonConfig {
.withDescription("Password of the database which will be used to access the database storage") .withDescription("Password of the database which will be used to access the database storage")
.withValidation(Field::isRequired); .withValidation(Field::isRequired);
private static final long DEFAULT_WAIT_RETRY_DELAY = 3000L;
public static final Field PROP_WAIT_RETRY_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.delay.ms")
.withDescription("Delay of retry on wait for connection failure")
.withDefault(DEFAULT_WAIT_RETRY_DELAY);
private static final int DEFAULT_MAX_RETRIES = 5;
public static final Field PROP_MAX_RETRIES = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.attempts")
.withDescription("Maximum number of retry attempts before giving up.")
.withDefault(DEFAULT_MAX_RETRIES);
private String jdbcUrl; private String jdbcUrl;
private String user; private String user;
private String password; private String password;
private Duration waitRetryDelay;
private int maxRetryCount;
public JdbcCommonConfig(Configuration config, String prefix) { public JdbcCommonConfig(Configuration config, String prefix) {
config = config.subset(prefix, true); config = config.subset(prefix, true);
@ -54,13 +67,15 @@ public JdbcCommonConfig(Configuration config, String prefix) {
} }
protected List<Field> getAllConfigurationFields() { protected List<Field> getAllConfigurationFields() {
return Collect.arrayListOf(PROP_JDBC_URL, PROP_USER, PROP_PASSWORD); return Collect.arrayListOf(PROP_JDBC_URL, PROP_USER, PROP_PASSWORD, PROP_WAIT_RETRY_DELAY, PROP_MAX_RETRIES);
} }
protected void init(Configuration config) { protected void init(Configuration config) {
jdbcUrl = config.getString(PROP_JDBC_URL); jdbcUrl = config.getString(PROP_JDBC_URL);
user = config.getString(PROP_USER); user = config.getString(PROP_USER);
password = config.getString(PROP_PASSWORD); password = config.getString(PROP_PASSWORD);
waitRetryDelay = Duration.ofMillis(config.getLong(PROP_WAIT_RETRY_DELAY));
maxRetryCount = config.getInteger(PROP_MAX_RETRIES);
} }
public String getJdbcUrl() { public String getJdbcUrl() {
@ -74,4 +89,12 @@ public String getUser() {
public String getPassword() { public String getPassword() {
return password; return password;
} }
public Duration getWaitRetryDelay() {
return waitRetryDelay;
}
public int getMaxRetryCount() {
return maxRetryCount;
}
} }

View File

@ -0,0 +1,219 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.jdbc;
import java.io.UncheckedIOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore;
import io.debezium.util.DelayStrategy;
/**
* Class encapsulates a java.sql.Connection. It provides {@code executeWithRetry} method to execute code snippet
* that interacts with the connection. If that fails with {@code SQLRecoverableException}, it will try to
* re-create new connection and perform the complete code snippet again (first, it performs rollback
* if specified in params).
* It attempts to reconnect number of times as specified in
* {@code io.debezium.storage.jdbc.JdbcCommonConfig.PROP_MAX_RETRIES} and there is a delay in between per
* {@code io.debezium.storage.jdbc.JdbcCommonConfig.PROP_WAIT_RETRY_DELAY}
*
* The code snippet provided should handle commit of its own if required. The connection is marked as autocommit = false
*
* @author Jiri Kulhanek
*/
public class RetriableConnection implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcOffsetBackingStore.class);
private final String url;
private final String user;
private final String pwd;
private final Duration waitRetryDelay;
private final int maxRetryCount;
private Connection conn;
public RetriableConnection(String url, String user, String pwd, Duration waitRetryDelay, int maxRetryCount) throws SQLException {
this.url = url;
this.user = user;
this.pwd = pwd;
this.waitRetryDelay = waitRetryDelay;
this.maxRetryCount = maxRetryCount;
try {
createConnection();
}
catch (SQLRecoverableException e) {
LOGGER.error("Unable to create connection. It will be re-attempted during its first use: " + e.getMessage(), e);
if (conn != null) {
try {
conn.close();
}
catch (Exception ex) {
// ignore
}
conn = null;
}
}
}
private void createConnection() throws SQLException {
this.conn = DriverManager.getConnection(url, user, pwd);
this.conn.setAutoCommit(false);
}
@Override
public void close() throws SQLException {
conn.close();
}
public boolean isConnectionCreated() {
return conn != null;
}
/**
* execute code snippet where no value is returned.
* @param consumer code snippet to execute
* @param name name of the operation being executed (for logging purposes)
* @param rollback if set to true, the rollback will be called in case of SQLException
* @throws SQLException sql connection related exception
* @throws UncheckedIOException exception that can be thrown by code snippet
*/
public synchronized void executeWithRetry(ConnectionConsumer consumer, String name, boolean rollback)
throws SQLException, UncheckedIOException {
executeWithRetry(null, consumer, name, rollback);
}
/**
* execute code snippet which returns some value.
* @param func code snippet to execute
* @param name name of the operation being executed (for logging purposes)
* @param rollback if set to true, the rollback will be called in case of SQLException
* @throws SQLException sql connection related exception
* @throws UncheckedIOException exception that can be thrown by code snippet
*/
public synchronized <T> T executeWithRetry(ConnectionFunction<T> func, String name, boolean rollback)
throws SQLException, UncheckedIOException {
return executeWithRetry(func, null, name, rollback);
}
private synchronized <T> T executeWithRetry(ConnectionFunction<T> func, ConnectionConsumer consumer, String name, boolean rollback)
throws SQLException, UncheckedIOException {
int attempt = 1;
while (true) {
if (conn == null) {
LOGGER.debug("Trying to reconnect (attempt {}).", attempt);
try {
createConnection();
}
catch (SQLException e) {
LOGGER.error("SQL Exception while trying to reconnect: " + e.getMessage(), e);
conn = null;
if (attempt >= maxRetryCount) {
throw e;
}
attempt++;
LOGGER.debug("Waiting for reconnect for {} ms.", waitRetryDelay);
DelayStrategy delayStrategy = DelayStrategy.constant(waitRetryDelay);
delayStrategy.sleepWhen(true);
continue;
}
}
try {
if (func != null) {
return func.accept(conn);
}
if (consumer != null) {
consumer.accept(conn);
return null;
}
}
catch (SQLRecoverableException e) {
LOGGER.warn("Attempt {} to call '{}' failed.", attempt, name, e);
if (rollback) {
LOGGER.warn("'{}': doing rollback.", name);
try {
conn.rollback();
}
catch (SQLException ex) {
// ignore
}
}
try {
conn.close();
}
catch (Exception ex) {
// ignore
}
conn = null;
}
catch (SQLException e) {
LOGGER.warn("Call '{}' failed.", name, e);
if (rollback) {
LOGGER.warn("'{}': doing rollback.", name);
try {
conn.rollback();
}
catch (SQLException ex) {
// ignore
}
}
throw e;
}
}
}
@FunctionalInterface
public interface ConnectionFunction<T> {
/**
* Performs this operation on the given connection.
*
* @param conn the input connection
* @return result of the operation
*/
T accept(Connection conn) throws SQLException;
}
@FunctionalInterface
public interface ConnectionConsumer {
/**
* Performs this operation on the given connection.
*
* @param conn the input connection
*/
void accept(Connection conn) throws SQLException;
/**
* Returns a composed {@code ConnectionFunctionVoid} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code ConnectionFunctionVoid} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default ConnectionConsumer andThen(ConnectionConsumer after) {
Objects.requireNonNull(after);
return (Connection c) -> {
accept(c);
after.accept(c);
};
}
}
}

View File

@ -6,9 +6,8 @@
package io.debezium.storage.jdbc.history; package io.debezium.storage.jdbc.history;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.io.UncheckedIOException;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
@ -36,6 +35,7 @@
import io.debezium.relational.history.SchemaHistory; import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException; import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener; import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.storage.jdbc.RetriableConnection;
import io.debezium.util.FunctionalReadWriteLock; import io.debezium.util.FunctionalReadWriteLock;
/** /**
@ -54,8 +54,7 @@ public final class JdbcSchemaHistory extends AbstractSchemaHistory {
private final DocumentReader reader = DocumentReader.defaultReader(); private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean(); private final AtomicBoolean running = new AtomicBoolean();
private final AtomicInteger recordInsertSeq = new AtomicInteger(0); private final AtomicInteger recordInsertSeq = new AtomicInteger(0);
private RetriableConnection conn;
private Connection conn;
private JdbcSchemaHistoryConfig config; private JdbcSchemaHistoryConfig config;
@Override @Override
@ -67,8 +66,8 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
super.configure(config, comparator, listener, useCatalogBeforeSchema); super.configure(config, comparator, listener, useCatalogBeforeSchema);
try { try {
conn = DriverManager.getConnection(this.config.getJdbcUrl(), this.config.getUser(), this.config.getPassword()); conn = new RetriableConnection(this.config.getJdbcUrl(), this.config.getUser(), this.config.getPassword(),
conn.setAutoCommit(false); this.config.getWaitRetryDelay(), this.config.getMaxRetryCount());
} }
catch (SQLException e) { catch (SQLException e) {
throw new IllegalStateException("Failed to connect " + this.config.getJdbcUrl(), e); throw new IllegalStateException("Failed to connect " + this.config.getJdbcUrl(), e);
@ -80,7 +79,7 @@ public void start() {
super.start(); super.start();
lock.write(() -> { lock.write(() -> {
if (running.compareAndSet(false, true)) { if (running.compareAndSet(false, true)) {
if (conn == null) { if (!conn.isConnectionCreated()) {
throw new IllegalStateException("Database connection must be set before it is started"); throw new IllegalStateException("Database connection must be set before it is started");
} }
try { try {
@ -106,7 +105,14 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
} }
try { try {
String line = writer.write(record.document()); conn.executeWithRetry(conn -> {
String line = null;
try {
line = writer.write(record.document());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
Timestamp currentTs = new Timestamp(System.currentTimeMillis()); Timestamp currentTs = new Timestamp(System.currentTimeMillis());
List<String> substrings = split(line, 65000); List<String> substrings = split(line, 65000);
int partSeq = 0; int partSeq = 0;
@ -121,14 +127,9 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
partSeq++; partSeq++;
} }
conn.commit(); conn.commit();
}, "store history record", true);
} }
catch (IOException | SQLException e) { catch (UncheckedIOException | SQLException e) {
try {
conn.rollback();
}
catch (SQLException ex) {
// ignore
}
throw new SchemaHistoryException("Failed to store record: " + record, e); throw new SchemaHistoryException("Failed to store record: " + record, e);
} }
}); });
@ -147,8 +148,10 @@ public void stop() {
running.set(false); running.set(false);
super.stop(); super.stop();
try { try {
if (conn != null) {
conn.close(); conn.close();
} }
}
catch (SQLException e) { catch (SQLException e) {
LOG.error("Exception during stop", e); LOG.error("Exception during stop", e);
} }
@ -159,6 +162,7 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> { lock.write(() -> {
try { try {
if (exists()) { if (exists()) {
conn.executeWithRetry(conn -> {
Statement stmt = conn.createStatement(); Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(config.getTableSelect()); ResultSet rs = stmt.executeQuery(config.getTableSelect());
@ -166,15 +170,21 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
String historyData = rs.getString("history_data"); String historyData = rs.getString("history_data");
if (historyData.isEmpty() == false) { if (historyData.isEmpty() == false) {
try {
records.accept(new HistoryRecord(reader.read(historyData))); records.accept(new HistoryRecord(reader.read(historyData)));
} }
catch (IOException e) {
throw new UncheckedIOException(e);
} }
} }
}
}, "recover history records", false);
}
else { else {
LOG.error("Storage does not exist when recovering records"); LOG.error("Storage does not exist when recovering records");
} }
} }
catch (IOException | SQLException e) { catch (UncheckedIOException | SQLException e) {
throw new SchemaHistoryException("Failed to recover records", e); throw new SchemaHistoryException("Failed to recover records", e);
} }
}); });
@ -182,20 +192,23 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
@Override @Override
public boolean storageExists() { public boolean storageExists() {
boolean sExists = false;
try { try {
return conn.executeWithRetry(conn -> {
boolean exists = false;
DatabaseMetaData dbMeta = conn.getMetaData(); DatabaseMetaData dbMeta = conn.getMetaData();
String databaseName = config.getDatabaseName(); String databaseName = config.getDatabaseName();
ResultSet tableExists = dbMeta.getTables(databaseName, ResultSet tableExists = dbMeta.getTables(databaseName,
null, config.getTableName(), null); null, config.getTableName(), null);
if (tableExists.next()) { if (tableExists.next()) {
sExists = true; exists = true;
} }
return exists;
}, "history storage exists", false);
} }
catch (SQLException e) { catch (SQLException e) {
throw new SchemaHistoryException("Failed to check database history storage", e); throw new SchemaHistoryException("Failed to check database history storage", e);
} }
return sExists;
} }
@Override @Override
@ -205,19 +218,20 @@ public boolean exists() {
return false; return false;
} }
boolean isExists = false;
try { try {
return conn.executeWithRetry(conn -> {
boolean isExists = false;
Statement stmt = conn.createStatement(); Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(config.getTableDataExistsSelect()); ResultSet rs = stmt.executeQuery(config.getTableDataExistsSelect());
while (rs.next()) { while (rs.next()) {
isExists = true; isExists = true;
} }
return isExists;
}, "history records exist check", false);
} }
catch (SQLException e) { catch (SQLException e) {
throw new SchemaHistoryException("Failed to recover records", e); throw new SchemaHistoryException("Failed to recover records", e);
} }
return isExists;
} }
@VisibleForTesting @VisibleForTesting
@ -233,6 +247,7 @@ public String toString() {
@Override @Override
public void initializeStorage() { public void initializeStorage() {
try { try {
conn.executeWithRetry(conn -> {
DatabaseMetaData dbMeta = conn.getMetaData(); DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null); ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null);
@ -242,9 +257,11 @@ public void initializeStorage() {
LOG.info("Creating table {} to store database history", config.getTableName()); LOG.info("Creating table {} to store database history", config.getTableName());
conn.prepareStatement(config.getTableCreate()).execute(); conn.prepareStatement(config.getTableCreate()).execute();
LOG.info("Created table in given database..."); LOG.info("Created table in given database...");
}, "initialize storage", false);
} }
catch (SQLException e) { catch (SQLException e) {
throw new SchemaHistoryException("Error initializing Database history storage", e); throw new SchemaHistoryException("Error initializing Database history storage", e);
} }
} }
} }

View File

@ -7,9 +7,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData; import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
@ -37,6 +35,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.storage.jdbc.RetriableConnection;
/** /**
* Implementation of OffsetBackingStore that saves data to database table. * Implementation of OffsetBackingStore that saves data to database table.
@ -50,7 +49,7 @@ public class JdbcOffsetBackingStore implements OffsetBackingStore {
protected ConcurrentHashMap<String, String> data = new ConcurrentHashMap<>(); protected ConcurrentHashMap<String, String> data = new ConcurrentHashMap<>();
protected ExecutorService executor; protected ExecutorService executor;
private final AtomicInteger recordInsertSeq = new AtomicInteger(0); private final AtomicInteger recordInsertSeq = new AtomicInteger(0);
private Connection conn; private RetriableConnection conn;
public JdbcOffsetBackingStore() { public JdbcOffsetBackingStore() {
} }
@ -70,8 +69,8 @@ public void configure(WorkerConfig config) {
Configuration configuration = Configuration.from(config.originalsStrings()); Configuration configuration = Configuration.from(config.originalsStrings());
this.config = new JdbcOffsetBackingStoreConfig(configuration); this.config = new JdbcOffsetBackingStoreConfig(configuration);
conn = DriverManager.getConnection(this.config.getJdbcUrl(), this.config.getUser(), this.config.getPassword()); conn = new RetriableConnection(this.config.getJdbcUrl(), this.config.getUser(), this.config.getPassword(),
conn.setAutoCommit(false); this.config.getWaitRetryDelay(), this.config.getMaxRetryCount());
} }
catch (Exception e) { catch (Exception e) {
throw new IllegalStateException("Failed to connect JDBC offset backing store: " + config.originalsStrings(), e); throw new IllegalStateException("Failed to connect JDBC offset backing store: " + config.originalsStrings(), e);
@ -95,20 +94,22 @@ public synchronized void start() {
} }
private void initializeTable() throws SQLException { private void initializeTable() throws SQLException {
conn.executeWithRetry(conn -> {
DatabaseMetaData dbMeta = conn.getMetaData(); DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null); ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null);
if (tableExists.next()) { if (tableExists.next()) {
return; return;
} }
LOGGER.info("Creating table {} to store offset", config.getTableName()); LOGGER.info("Creating table {} to store offset", config.getTableName());
conn.prepareStatement(config.getTableCreate()).execute(); conn.prepareStatement(config.getTableCreate()).execute();
}, "checking / creating table", false);
} }
protected void save() { protected void save() {
try {
LOGGER.debug("Saving data to state table..."); LOGGER.debug("Saving data to state table...");
try {
conn.executeWithRetry((conn) -> {
try (PreparedStatement sqlDelete = conn.prepareStatement(config.getTableDelete())) { try (PreparedStatement sqlDelete = conn.prepareStatement(config.getTableDelete())) {
sqlDelete.executeUpdate(); sqlDelete.executeUpdate();
for (Map.Entry<String, String> mapEntry : data.entrySet()) { for (Map.Entry<String, String> mapEntry : data.entrySet()) {
@ -127,14 +128,9 @@ protected void save() {
} }
} }
conn.commit(); conn.commit();
}, "Saving offset", true);
} }
catch (SQLException e) { catch (SQLException e) {
try {
conn.rollback();
}
catch (SQLException ex) {
// Ignore errors on rollback
}
throw new ConnectException(e); throw new ConnectException(e);
} }
} }
@ -142,6 +138,7 @@ protected void save() {
private void load() { private void load() {
try { try {
ConcurrentHashMap<String, String> tmpData = new ConcurrentHashMap<>(); ConcurrentHashMap<String, String> tmpData = new ConcurrentHashMap<>();
conn.executeWithRetry(conn -> {
Statement stmt = conn.createStatement(); Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(config.getTableSelect()); ResultSet rs = stmt.executeQuery(config.getTableSelect());
while (rs.next()) { while (rs.next()) {
@ -150,6 +147,7 @@ private void load() {
tmpData.put(key, val); tmpData.put(key, val);
} }
data = tmpData; data = tmpData;
}, "loading offset data", false);
} }
catch (SQLException e) { catch (SQLException e) {
throw new ConnectException("Failed recover records from database: " + config.getJdbcUrl(), e); throw new ConnectException("Failed recover records from database: " + config.getJdbcUrl(), e);