From fb8150e617d0db629f332419664bd623c0885d9e Mon Sep 17 00:00:00 2001 From: mfvitale Date: Fri, 12 Jul 2024 14:47:23 +0200 Subject: [PATCH] DBZ-8058 Connection will be validated and re-connect if not valid --- .../java/io/debezium/jdbc/JdbcConnection.java | 23 +++++++++++++++---- .../RelationalSnapshotChangeEventSource.java | 5 ++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index e35a5e8f4..9e3799df9 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -404,6 +404,15 @@ public JdbcConnection connect() throws SQLException { return this; } + /** + * Ensure a connection to the database is established again. + * + * @throws SQLException if there is an error connecting to the database + */ + public void reconnect() throws SQLException { + establishConnection(); + } + /** * Execute a series of SQL statements as a single transaction. * @@ -891,10 +900,9 @@ public synchronized Connection connection() throws SQLException { public synchronized Connection connection(boolean executeOnConnect) throws SQLException { if (!isConnected()) { - conn = factory.connect(JdbcConfiguration.adapt(config)); - if (!isConnected()) { - throw new SQLException("Unable to obtain a JDBC connection"); - } + + establishConnection(); + // Always run the initial operations on this new connection if (initialOps != null) { execute(initialOps); @@ -908,6 +916,13 @@ public synchronized Connection connection(boolean executeOnConnect) throws SQLEx return conn; } + private void establishConnection() throws SQLException { + conn = factory.connect(JdbcConfiguration.adapt(config)); + if (!isConnected()) { + throw new SQLException("Unable to obtain a JDBC connection"); + } + } + protected List parseSqlStatementString(final String statements) { final List splitStatements = new ArrayList<>(); final char[] statementsChars = statements.toCharArray(); diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 9cd69d2a1..afe929237 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -238,6 +238,11 @@ private Queue createConnectionPool(final RelationalSnapshotConte } public Connection createSnapshotConnection() throws SQLException { + + if (!jdbcConnection.isValid()) { + jdbcConnection.reconnect(); + } + Connection connection = jdbcConnection.connection(); connection.setAutoCommit(false); return connection;