diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/RetriableConnection.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/RetriableConnection.java index d1fbd3cbf..103da4e51 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/RetriableConnection.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/RetriableConnection.java @@ -64,7 +64,7 @@ private void createConnection() throws SQLException { @Override public void close() throws SQLException { - if (isConnectionCreated()) { + if (isOpen()) { try { conn.close(); } @@ -75,8 +75,15 @@ public void close() throws SQLException { conn = null; } - public boolean isConnectionCreated() { - return conn != null; + public boolean isOpen() { + try { + return conn != null && !conn.isClosed(); + } + catch (SQLException e) { + LOGGER.warn("Exception while checking connection", e); + conn = null; + } + return false; } /** @@ -107,7 +114,7 @@ private synchronized T executeWithRetry(ConnectionFunction func, Connecti throws SQLException { int attempt = 1; while (true) { - if (!isConnectionCreated()) { + if (!isOpen()) { LOGGER.debug("Trying to reconnect (attempt {}).", attempt); try { createConnection(); diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java index e11ad8100..b3da1b03d 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java @@ -79,7 +79,7 @@ public void start() { super.start(); lock.write(() -> { if (running.compareAndSet(false, true)) { - if (!conn.isConnectionCreated()) { + if (!conn.isOpen()) { throw new IllegalStateException("Database connection must be set before it is started"); } try { @@ -117,14 +117,15 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { List substrings = split(line, 65000); int partSeq = 0; for (String dataPart : substrings) { - PreparedStatement sql = conn.prepareStatement(config.getTableInsert()); - sql.setString(1, UUID.randomUUID().toString()); - sql.setString(2, dataPart); - sql.setInt(3, partSeq); - sql.setTimestamp(4, currentTs); - sql.setInt(5, recordInsertSeq.incrementAndGet()); - sql.executeUpdate(); - partSeq++; + try (PreparedStatement sql = conn.prepareStatement(config.getTableInsert())) { + sql.setString(1, UUID.randomUUID().toString()); + sql.setString(2, dataPart); + sql.setInt(3, partSeq); + sql.setTimestamp(4, currentTs); + sql.setInt(5, recordInsertSeq.incrementAndGet()); + sql.executeUpdate(); + partSeq++; + } } conn.commit(); }, "store history record", true); @@ -163,18 +164,19 @@ protected synchronized void recoverRecords(Consumer records) { try { if (exists()) { conn.executeWithRetry(conn -> { - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(config.getTableSelect()); + try ( + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(config.getTableSelect())) { + while (rs.next()) { + String historyData = rs.getString("history_data"); - while (rs.next()) { - String historyData = rs.getString("history_data"); - - if (historyData.isEmpty() == false) { - try { - records.accept(new HistoryRecord(reader.read(historyData))); - } - catch (IOException e) { - throw new DebeziumException(e); + if (historyData.isEmpty() == false) { + try { + records.accept(new HistoryRecord(reader.read(historyData))); + } + catch (IOException e) { + throw new DebeziumException(e); + } } } } @@ -198,12 +200,13 @@ public boolean storageExists() { DatabaseMetaData dbMeta = conn.getMetaData(); String databaseName = config.getDatabaseName(); - ResultSet tableExists = dbMeta.getTables(databaseName, - null, config.getTableName(), null); - if (tableExists.next()) { - exists = true; + try (ResultSet tableExists = dbMeta.getTables(databaseName, + null, config.getTableName(), null)) { + if (tableExists.next()) { + exists = true; + } + return exists; } - return exists; }, "history storage exists", false); } catch (SQLException e) { @@ -221,12 +224,14 @@ public boolean exists() { try { return conn.executeWithRetry(conn -> { boolean isExists = false; - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(config.getTableDataExistsSelect()); - while (rs.next()) { - isExists = true; + try ( + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(config.getTableDataExistsSelect());) { + while (rs.next()) { + isExists = true; + } + return isExists; } - return isExists; }, "history records exist check", false); } catch (SQLException e) { @@ -249,14 +254,16 @@ public void initializeStorage() { try { conn.executeWithRetry(conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null); - - if (tableExists.next()) { - return; + try (ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null)) { + if (tableExists.next()) { + return; + } + LOG.info("Creating table {} to store database history", config.getTableName()); + try (var ps = conn.prepareStatement(config.getTableCreate())) { + ps.execute(); + LOG.info("Created table in given database..."); + } } - LOG.info("Creating table {} to store database history", config.getTableName()); - conn.prepareStatement(config.getTableCreate()).execute(); - LOG.info("Created table in given database..."); }, "initialize storage", false); } catch (SQLException e) { diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStore.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStore.java index e67931a62..214388c46 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStore.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStore.java @@ -96,13 +96,15 @@ public synchronized void start() { private void initializeTable() throws SQLException { conn.executeWithRetry(conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null); - - if (tableExists.next()) { - return; + try (ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null)) { + if (tableExists.next()) { + return; + } } LOGGER.info("Creating table {} to store offset", config.getTableName()); - conn.prepareStatement(config.getTableCreate()).execute(); + try (var ps = conn.prepareStatement(config.getTableCreate())) { + ps.execute(); + } }, "checking / creating table", false); } @@ -139,12 +141,14 @@ private void load() { try { ConcurrentHashMap tmpData = new ConcurrentHashMap<>(); conn.executeWithRetry(conn -> { - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(config.getTableSelect()); - while (rs.next()) { - String key = rs.getString("offset_key"); - String val = rs.getString("offset_val"); - tmpData.put(key, val); + try ( + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(config.getTableSelect())) { + while (rs.next()) { + String key = rs.getString("offset_key"); + String val = rs.getString("offset_val"); + tmpData.put(key, val); + } } data = tmpData; }, "loading offset data", false);