DBZ-6251 Ignore Debezium internal transactions during fallback to restart_lsn in Postgres connector if confirmed_flush_lsn is NULL

This commit is contained in:
Anatolii Popov 2023-03-24 11:58:47 +02:00 committed by Jiri Pechanec
parent 735933ce6b
commit 930d9b0971
2 changed files with 24 additions and 14 deletions

View File

@ -16,7 +16,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@ -296,14 +295,7 @@ private Lsn parseConfirmedFlushLsn(String slotName, String pluginName, String da
confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn");
if (confirmedFlushedLsn == null) {
LOGGER.info("Failed to obtain valid replication slot, confirmed flush lsn is null");
AtomicBoolean hasConcurrentTransaction = new AtomicBoolean(false);
int connectionPID = ((PgConnection) connection()).getBackendPID();
query("select * from pg_stat_activity where state like 'idle in transaction' AND pid <> " + connectionPID, rset -> {
if (rset.next()) {
hasConcurrentTransaction.set(true);
}
});
if (!hasConcurrentTransaction.get()) {
if (!hasIdleTransactions()) {
confirmedFlushedLsn = tryFallbackToRestartLsn(slotName, pluginName, database, rs);
}
}
@ -315,6 +307,20 @@ private Lsn parseConfirmedFlushLsn(String slotName, String pluginName, String da
return confirmedFlushedLsn;
}
private boolean hasIdleTransactions() throws SQLException {
return queryAndMap(
"select * from pg_stat_activity where state like 'idle in transaction' AND application_name != '" + CONNECTION_GENERAL + "' AND pid <> pg_backend_pid()",
rs -> {
if (rs.next()) {
LOGGER.debug("Found at least one idle transaction with pid " + rs.getInt("pid") + " for application" + rs.getString("application_name"));
return true;
}
else {
return false;
}
});
}
private Lsn tryFallbackToRestartLsn(String slotName, String pluginName, String database, ResultSet rs) {
Lsn confirmedFlushedLsn;
LOGGER.info("Unable to find confirmed_flushed_lsn, falling back to restart_lsn");

View File

@ -231,20 +231,24 @@ public void shouldSupportPG95RestartLsn() throws Exception {
}
@Test(timeout = 60000)
@Test
public void shouldSupportFallbackToRestartLsn() throws Exception {
String slotName = "emptyconfirmed";
try (ReplicationConnection replConnection = TestHelper.createForReplication(slotName, false)) {
replConnection.initConnection();
assertTrue(replConnection.isConnected());
}
try (PostgresConnection conn = buildConnectionWithEmptyConfirmedFlushLSN(slotName)) {
ServerInfo.ReplicationSlot slotInfo = conn.readReplicationSlotInfo(slotName, TestHelper.decoderPlugin().getPostgresPluginName());
try (PostgresConnection withIdleTransaction = new PostgresConnection(JdbcConfiguration.adapt(TestHelper.defaultJdbcConfig()),
PostgresConnection.CONNECTION_GENERAL);
PostgresConnection withEmptyConfirmedFlushLSN = buildConnectionWithEmptyConfirmedFlushLSN(slotName)) {
withIdleTransaction.setAutoCommit(false);
withIdleTransaction.query("select 1", connection -> {
});
ServerInfo.ReplicationSlot slotInfo = withEmptyConfirmedFlushLSN.readReplicationSlotInfo(slotName, TestHelper.decoderPlugin().getPostgresPluginName());
assertNotNull(slotInfo);
assertNotEquals(ServerInfo.ReplicationSlot.INVALID, slotInfo);
conn.dropReplicationSlot(slotName);
withEmptyConfirmedFlushLSN.dropReplicationSlot(slotName);
}
}
// "fake" a pg95 response by not returning confirmed_flushed_lsn