DBZ-1564 Close open transaction during postgres streaming

After the schema refesh, we do not want to hold a long running open transaction
since that is expensive.
This commit is contained in:
Grant Cooksey 2019-10-17 09:32:49 -04:00 committed by Jiri Pechanec
parent 80e6ccfb2f
commit 5ab5b4063d
2 changed files with 49 additions and 0 deletions

View File

@ -66,6 +66,10 @@ protected PostgresConnectorConfig config() {
protected void refreshSchema(PostgresConnection connection, boolean printReplicaIdentityInfo) throws SQLException {
schema.refresh(connection, printReplicaIdentityInfo);
// Open transaction unnecessary during task execution
if (!connection.connection().getAutoCommit()) {
connection.connection().commit();
}
}
Long getSlotXmin(PostgresConnection connection) throws SQLException {

View File

@ -11,6 +11,7 @@
import static io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan.PostgresVersion.POSTGRES_10;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import java.util.Arrays;
@ -22,6 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@ -43,6 +45,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import org.postgresql.jdbc.PgConnection;
/**
* Integration test for {@link RecordsSnapshotProducerIT}
@ -184,6 +187,48 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
assertSourceInfo(second, TestHelper.TEST_DATABASE, "s2", "a");
}
@Test
@FixFor("DBZ-1564")
public void shouldCloseTransactionsAfterSnapshot() throws Exception {
// PostGIS must not be used
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
String insertStmt = "INSERT INTO s1.a (aa) VALUES (1);" +
"INSERT INTO s2.a (aa) VALUES (1);";
String statements = "CREATE SCHEMA s1; " +
"CREATE SCHEMA s2; " +
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
insertStmt;
TestHelper.execute(statements);
buildWithStreamProducer(TestHelper.defaultConfig());
TestConsumer consumer = testConsumer(2, "s1", "s2");
waitForSnapshotToBeCompleted();
// first make sure we get the initial records from both schemas...
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
consumer.clear();
waitForStreamingToStart();
try (PostgresConnection connection = TestHelper.create()) {
connection.setAutoCommit(true);
int connectionPID = ((PgConnection) connection.connection()).getBackendPID();
String connectionStateQuery = "SELECT state FROM pg_stat_activity WHERE pid <> " + connectionPID;
connection.query(connectionStateQuery, rs -> {
while (rs.next()) {
assertNotEquals(rs.getString(1), "idle in transaction");
}
});
}
stopConnector();
}
@Test
@FixFor("DBZ-859")
public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {