DBZ-3951 Adding a safety check, isValid, for connection

This commit is contained in:
Vivek Wassan 2021-08-31 13:27:56 -04:00 committed by Jiri Pechanec
parent ff4d487d44
commit 5f37e6630f
3 changed files with 16 additions and 2 deletions

View File

@ -16,6 +16,7 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
@ -190,4 +191,16 @@ public void rereadChunk() throws InterruptedException {
private MySqlReadOnlyIncrementalSnapshotContext<T> getContext() {
return (MySqlReadOnlyIncrementalSnapshotContext<T>) context;
}
@Override
protected void preReadChunk(IncrementalSnapshotContext<T> context) {
try {
if (!jdbcConnection.isConnected()) {
jdbcConnection.connect();
}
}
catch (SQLException e) {
throw new DebeziumException(String.format("Database error while checking jdbcConnection in preReadChunk"), e);
}
}
}

View File

@ -76,6 +76,7 @@ public class JdbcConnection implements AutoCloseable {
private static final char STATEMENT_DELIMITER = ';';
private static final int STATEMENT_CACHE_CAPACITY = 10_000;
private final static Logger LOGGER = LoggerFactory.getLogger(JdbcConnection.class);
private static final int CONNECTION_VALID_CHECK_TIMEOUT_IN_SEC = 3;
private final Map<String, PreparedStatement> statementCache = new BoundedConcurrentHashMap<>(STATEMENT_CACHE_CAPACITY, 16, Eviction.LIRS,
new EvictionListener<String, PreparedStatement>() {
@ -874,7 +875,7 @@ public synchronized boolean isConnected() throws SQLException {
if (conn == null) {
return false;
}
return !conn.isClosed();
return !conn.isClosed() && conn.isValid(CONNECTION_VALID_CHECK_TIMEOUT_IN_SEC);
}
public synchronized Connection connection() throws SQLException {

View File

@ -295,7 +295,7 @@ public SQLXML createSQLXML() throws SQLException {
@Override
public boolean isValid(int timeout) throws SQLException {
return false;
return true;
}
@Override