DBZ-6092 Simplify the code; change logging

This commit is contained in:
Jiri Pechanec 2023-03-16 11:50:33 +01:00
parent cbb785d67a
commit d4c3e23767
2 changed files with 8 additions and 7 deletions

View File

@ -23,6 +23,7 @@ Alfusainey Jallow
Alisa Houskova Alisa Houskova
Amit Sela Amit Sela
Aman Garg Aman Garg
Anatolii Popov
Anders Engström Anders Engström
Andrea Cosentino Andrea Cosentino
Andreas Bergmeier Andreas Bergmeier

View File

@ -295,12 +295,12 @@ private Lsn parseConfirmedFlushLsn(String slotName, String pluginName, String da
try { try {
confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn"); confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn");
if (confirmedFlushedLsn == null) { if (confirmedFlushedLsn == null) {
LOGGER.debug("Failed to obtain valid replication slot, confirmed flush lsn is null"); LOGGER.info("Failed to obtain valid replication slot, confirmed flush lsn is null");
AtomicBoolean hasConcurrentTransaction = new AtomicBoolean(false); AtomicBoolean hasConcurrentTransaction = new AtomicBoolean(false);
int connectionPID = ((PgConnection) connection()).getBackendPID(); int connectionPID = ((PgConnection) connection()).getBackendPID();
query("select * from pg_stat_activity where state like 'idle in transaction' AND pid <> " + connectionPID, rset -> { query("select * from pg_stat_activity where state like 'idle in transaction' AND pid <> " + connectionPID, rset -> {
if (rset.next()) { if (rset.next()) {
hasConcurrentTransaction.compareAndSet(false, true); hasConcurrentTransaction.set(true);
} }
}); });
if (!hasConcurrentTransaction.get()) { if (!hasConcurrentTransaction.get()) {
@ -321,8 +321,8 @@ private Lsn tryFallbackToRestartLsn(String slotName, String pluginName, String d
try { try {
confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn"); confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
} }
catch (SQLException e2) { catch (SQLException e) {
throw new ConnectException("Neither confirmed_flush_lsn nor restart_lsn could be found"); throw new DebeziumException("Neither confirmed_flush_lsn nor restart_lsn could be found", e);
} }
return confirmedFlushedLsn; return confirmedFlushedLsn;
} }
@ -333,7 +333,7 @@ private Lsn parseRestartLsn(String slotName, String pluginName, String database,
restartLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn"); restartLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
} }
catch (SQLException e) { catch (SQLException e) {
throw new ConnectException("restart_lsn could be found"); throw new DebeziumException("restart_lsn could be found");
} }
return restartLsn; return restartLsn;
@ -350,13 +350,13 @@ private Lsn tryParseLsn(String slotName, String pluginName, String database, Res
lsn = Lsn.valueOf(lsnStr); lsn = Lsn.valueOf(lsnStr);
} }
catch (Exception e) { catch (Exception e) {
throw new ConnectException("Value " + column + " in the pg_replication_slots table for slot = '" throw new DebeziumException("Value " + column + " in the pg_replication_slots table for slot = '"
+ slotName + "', plugin = '" + slotName + "', plugin = '"
+ pluginName + "', database = '" + pluginName + "', database = '"
+ database + "' is not valid. This is an abnormal situation and the database status should be checked."); + database + "' is not valid. This is an abnormal situation and the database status should be checked.");
} }
if (!lsn.isValid()) { if (!lsn.isValid()) {
throw new ConnectException("Invalid LSN returned from database"); throw new DebeziumException("Invalid LSN returned from database");
} }
return lsn; return lsn;
} }