DBZ-7258 Added try-wit-resources; changed connection check method
This commit is contained in:
parent
54e6c855a2
commit
15e795547d
@ -64,7 +64,7 @@ private void createConnection() throws SQLException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws SQLException {
|
public void close() throws SQLException {
|
||||||
if (isConnectionCreated()) {
|
if (isOpen()) {
|
||||||
try {
|
try {
|
||||||
conn.close();
|
conn.close();
|
||||||
}
|
}
|
||||||
@ -75,8 +75,15 @@ public void close() throws SQLException {
|
|||||||
conn = null;
|
conn = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isConnectionCreated() {
|
public boolean isOpen() {
|
||||||
return conn != null;
|
try {
|
||||||
|
return conn != null && !conn.isClosed();
|
||||||
|
}
|
||||||
|
catch (SQLException e) {
|
||||||
|
LOGGER.warn("Exception while checking connection", e);
|
||||||
|
conn = null;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -107,7 +114,7 @@ private synchronized <T> T executeWithRetry(ConnectionFunction<T> func, Connecti
|
|||||||
throws SQLException {
|
throws SQLException {
|
||||||
int attempt = 1;
|
int attempt = 1;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (!isConnectionCreated()) {
|
if (!isOpen()) {
|
||||||
LOGGER.debug("Trying to reconnect (attempt {}).", attempt);
|
LOGGER.debug("Trying to reconnect (attempt {}).", attempt);
|
||||||
try {
|
try {
|
||||||
createConnection();
|
createConnection();
|
||||||
|
@ -79,7 +79,7 @@ public void start() {
|
|||||||
super.start();
|
super.start();
|
||||||
lock.write(() -> {
|
lock.write(() -> {
|
||||||
if (running.compareAndSet(false, true)) {
|
if (running.compareAndSet(false, true)) {
|
||||||
if (!conn.isConnectionCreated()) {
|
if (!conn.isOpen()) {
|
||||||
throw new IllegalStateException("Database connection must be set before it is started");
|
throw new IllegalStateException("Database connection must be set before it is started");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
@ -117,14 +117,15 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
|
|||||||
List<String> substrings = split(line, 65000);
|
List<String> substrings = split(line, 65000);
|
||||||
int partSeq = 0;
|
int partSeq = 0;
|
||||||
for (String dataPart : substrings) {
|
for (String dataPart : substrings) {
|
||||||
PreparedStatement sql = conn.prepareStatement(config.getTableInsert());
|
try (PreparedStatement sql = conn.prepareStatement(config.getTableInsert())) {
|
||||||
sql.setString(1, UUID.randomUUID().toString());
|
sql.setString(1, UUID.randomUUID().toString());
|
||||||
sql.setString(2, dataPart);
|
sql.setString(2, dataPart);
|
||||||
sql.setInt(3, partSeq);
|
sql.setInt(3, partSeq);
|
||||||
sql.setTimestamp(4, currentTs);
|
sql.setTimestamp(4, currentTs);
|
||||||
sql.setInt(5, recordInsertSeq.incrementAndGet());
|
sql.setInt(5, recordInsertSeq.incrementAndGet());
|
||||||
sql.executeUpdate();
|
sql.executeUpdate();
|
||||||
partSeq++;
|
partSeq++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
conn.commit();
|
conn.commit();
|
||||||
}, "store history record", true);
|
}, "store history record", true);
|
||||||
@ -163,18 +164,19 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
|
|||||||
try {
|
try {
|
||||||
if (exists()) {
|
if (exists()) {
|
||||||
conn.executeWithRetry(conn -> {
|
conn.executeWithRetry(conn -> {
|
||||||
Statement stmt = conn.createStatement();
|
try (
|
||||||
ResultSet rs = stmt.executeQuery(config.getTableSelect());
|
Statement stmt = conn.createStatement();
|
||||||
|
ResultSet rs = stmt.executeQuery(config.getTableSelect())) {
|
||||||
|
while (rs.next()) {
|
||||||
|
String historyData = rs.getString("history_data");
|
||||||
|
|
||||||
while (rs.next()) {
|
if (historyData.isEmpty() == false) {
|
||||||
String historyData = rs.getString("history_data");
|
try {
|
||||||
|
records.accept(new HistoryRecord(reader.read(historyData)));
|
||||||
if (historyData.isEmpty() == false) {
|
}
|
||||||
try {
|
catch (IOException e) {
|
||||||
records.accept(new HistoryRecord(reader.read(historyData)));
|
throw new DebeziumException(e);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
|
||||||
throw new DebeziumException(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -198,12 +200,13 @@ public boolean storageExists() {
|
|||||||
DatabaseMetaData dbMeta = conn.getMetaData();
|
DatabaseMetaData dbMeta = conn.getMetaData();
|
||||||
|
|
||||||
String databaseName = config.getDatabaseName();
|
String databaseName = config.getDatabaseName();
|
||||||
ResultSet tableExists = dbMeta.getTables(databaseName,
|
try (ResultSet tableExists = dbMeta.getTables(databaseName,
|
||||||
null, config.getTableName(), null);
|
null, config.getTableName(), null)) {
|
||||||
if (tableExists.next()) {
|
if (tableExists.next()) {
|
||||||
exists = true;
|
exists = true;
|
||||||
|
}
|
||||||
|
return exists;
|
||||||
}
|
}
|
||||||
return exists;
|
|
||||||
}, "history storage exists", false);
|
}, "history storage exists", false);
|
||||||
}
|
}
|
||||||
catch (SQLException e) {
|
catch (SQLException e) {
|
||||||
@ -221,12 +224,14 @@ public boolean exists() {
|
|||||||
try {
|
try {
|
||||||
return conn.executeWithRetry(conn -> {
|
return conn.executeWithRetry(conn -> {
|
||||||
boolean isExists = false;
|
boolean isExists = false;
|
||||||
Statement stmt = conn.createStatement();
|
try (
|
||||||
ResultSet rs = stmt.executeQuery(config.getTableDataExistsSelect());
|
Statement stmt = conn.createStatement();
|
||||||
while (rs.next()) {
|
ResultSet rs = stmt.executeQuery(config.getTableDataExistsSelect());) {
|
||||||
isExists = true;
|
while (rs.next()) {
|
||||||
|
isExists = true;
|
||||||
|
}
|
||||||
|
return isExists;
|
||||||
}
|
}
|
||||||
return isExists;
|
|
||||||
}, "history records exist check", false);
|
}, "history records exist check", false);
|
||||||
}
|
}
|
||||||
catch (SQLException e) {
|
catch (SQLException e) {
|
||||||
@ -249,14 +254,16 @@ public void initializeStorage() {
|
|||||||
try {
|
try {
|
||||||
conn.executeWithRetry(conn -> {
|
conn.executeWithRetry(conn -> {
|
||||||
DatabaseMetaData dbMeta = conn.getMetaData();
|
DatabaseMetaData dbMeta = conn.getMetaData();
|
||||||
ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null);
|
try (ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null)) {
|
||||||
|
if (tableExists.next()) {
|
||||||
if (tableExists.next()) {
|
return;
|
||||||
return;
|
}
|
||||||
|
LOG.info("Creating table {} to store database history", config.getTableName());
|
||||||
|
try (var ps = conn.prepareStatement(config.getTableCreate())) {
|
||||||
|
ps.execute();
|
||||||
|
LOG.info("Created table in given database...");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Creating table {} to store database history", config.getTableName());
|
|
||||||
conn.prepareStatement(config.getTableCreate()).execute();
|
|
||||||
LOG.info("Created table in given database...");
|
|
||||||
}, "initialize storage", false);
|
}, "initialize storage", false);
|
||||||
}
|
}
|
||||||
catch (SQLException e) {
|
catch (SQLException e) {
|
||||||
|
@ -96,13 +96,15 @@ public synchronized void start() {
|
|||||||
private void initializeTable() throws SQLException {
|
private void initializeTable() throws SQLException {
|
||||||
conn.executeWithRetry(conn -> {
|
conn.executeWithRetry(conn -> {
|
||||||
DatabaseMetaData dbMeta = conn.getMetaData();
|
DatabaseMetaData dbMeta = conn.getMetaData();
|
||||||
ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null);
|
try (ResultSet tableExists = dbMeta.getTables(null, null, config.getTableName(), null)) {
|
||||||
|
if (tableExists.next()) {
|
||||||
if (tableExists.next()) {
|
return;
|
||||||
return;
|
}
|
||||||
}
|
}
|
||||||
LOGGER.info("Creating table {} to store offset", config.getTableName());
|
LOGGER.info("Creating table {} to store offset", config.getTableName());
|
||||||
conn.prepareStatement(config.getTableCreate()).execute();
|
try (var ps = conn.prepareStatement(config.getTableCreate())) {
|
||||||
|
ps.execute();
|
||||||
|
}
|
||||||
}, "checking / creating table", false);
|
}, "checking / creating table", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,12 +141,14 @@ private void load() {
|
|||||||
try {
|
try {
|
||||||
ConcurrentHashMap<String, String> tmpData = new ConcurrentHashMap<>();
|
ConcurrentHashMap<String, String> tmpData = new ConcurrentHashMap<>();
|
||||||
conn.executeWithRetry(conn -> {
|
conn.executeWithRetry(conn -> {
|
||||||
Statement stmt = conn.createStatement();
|
try (
|
||||||
ResultSet rs = stmt.executeQuery(config.getTableSelect());
|
Statement stmt = conn.createStatement();
|
||||||
while (rs.next()) {
|
ResultSet rs = stmt.executeQuery(config.getTableSelect())) {
|
||||||
String key = rs.getString("offset_key");
|
while (rs.next()) {
|
||||||
String val = rs.getString("offset_val");
|
String key = rs.getString("offset_key");
|
||||||
tmpData.put(key, val);
|
String val = rs.getString("offset_val");
|
||||||
|
tmpData.put(key, val);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
data = tmpData;
|
data = tmpData;
|
||||||
}, "loading offset data", false);
|
}, "loading offset data", false);
|
||||||
|
Loading…
Reference in New Issue
Block a user