DBZ-2176 Open transaction test are more reliable

Checking pg_stat_activity for transactions with a state of "idle in transaction"
should be more reliable to find open abandoned transaction than looking for
a non-null backend_xmin. For example, an autovaccum process will have a
backend_xmin but will not have a stat of "idle in transaction"
This commit is contained in:
cooksey 2020-06-09 15:57:11 -04:00 committed by Jiri Pechanec
parent 7edc482c43
commit 1282403c44
4 changed files with 31 additions and 34 deletions

View File

@ -858,16 +858,7 @@ public void shouldCloseTxAfterTypeQuery() throws Exception {
final String isbn = new String(((Struct) record.value()).getStruct("after").getBytes("aa"));
Assertions.assertThat(isbn).isEqualTo("0-393-04002-X");
try (final PostgresConnection connection = TestHelper.create()) {
try {
Awaitility.await()
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.until(() -> getActiveTransactions(connection).size() == 1);
}
catch (ConditionTimeoutException e) {
}
assertThat(getActiveTransactions(connection)).hasSize(1);
}
TestHelper.assertNoOpenTransactions();
stopConnector();
assertConnectorNotRunning();
}
@ -1196,18 +1187,6 @@ private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLExc
});
}
private List<String> getActiveTransactions(PostgresConnection connection) throws SQLException {
return connection.queryAndMap(
"SELECT query FROM pg_stat_activity WHERE backend_xmin IS NOT NULL ORDER BY age(backend_xmin) DESC;",
rs -> {
final List<String> ret = new ArrayList<>();
while (rs.next()) {
ret.add(rs.getString(1));
}
return ret;
});
}
private void assertFieldAbsent(SourceRecord record, String fieldName) {
Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER);
try {

View File

@ -283,7 +283,7 @@ public void shouldCloseTransactionsAfterSnapshot() throws Exception {
waitForStreamingToStart();
TestHelper.noTransactionActive();
TestHelper.assertNoOpenTransactions();
stopConnector();
}

View File

@ -66,7 +66,6 @@
import io.debezium.data.Bits;
import io.debezium.data.Enum;
import io.debezium.data.Envelope;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
@ -1318,7 +1317,7 @@ public void shouldNotRefreshSchemaOnUnchangedToastedData() throws Exception {
assertEquals(Arrays.asList("pk", "text", "not_toast"), tbl.retrieveColumnNames());
});
TestHelper.noTransactionActive();
TestHelper.assertNoOpenTransactions();
}
@Test

View File

@ -6,6 +6,7 @@
package io.debezium.connector.postgresql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import java.net.URL;
@ -16,12 +17,14 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.awaitility.core.ConditionTimeoutException;
import org.postgresql.jdbc.PgConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -317,16 +320,32 @@ protected static void waitForDefaultReplicationSlotBeActive() {
}
}
protected static void noTransactionActive() throws SQLException {
protected static void assertNoOpenTransactions() throws SQLException {
try (PostgresConnection connection = TestHelper.create()) {
connection.setAutoCommit(true);
int connectionPID = ((PgConnection) connection.connection()).getBackendPID();
String connectionStateQuery = "SELECT state FROM pg_stat_activity WHERE pid <> " + connectionPID;
connection.query(connectionStateQuery, rs -> {
while (rs.next()) {
Assert.assertNotEquals(rs.getString(1), "idle in transaction");
}
});
try {
Awaitility.await()
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.until(() -> getOpenIdleTransactions(connection).size() == 0);
}
catch (ConditionTimeoutException e) {
}
assertThat(getOpenIdleTransactions(connection)).hasSize(0);
}
}
private static List<String> getOpenIdleTransactions(PostgresConnection connection) throws SQLException {
int connectionPID = ((PgConnection) connection.connection()).getBackendPID();
return connection.queryAndMap(
"SELECT state FROM pg_stat_activity WHERE state like 'idle in transaction' AND pid <> " + connectionPID,
rs -> {
final List<String> ret = new ArrayList<>();
while (rs.next()) {
ret.add(rs.getString(1));
}
return ret;
});
}
}