From 853ebafd7f4854f06c5e489a5c984ecf31dd79eb Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 3 Jun 2020 15:05:54 +0200 Subject: [PATCH] DBZ-2118 Close TX after type query --- .travis.yml | 4 +- .../PostgresStreamingChangeEventSource.java | 2 + .../connector/postgresql/TypeRegistry.java | 3 ++ .../postgresql/PostgresConnectorIT.java | 49 +++++++++++++++++++ 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index e11c0d24f..3152b0324 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,8 +6,8 @@ cache: env: - MAVEN_CLI: '"clean install -B -pl debezium-connector-sqlserver -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-mysql -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly,wal2json-decoder -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' + - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly -Ddebezium.test.records.waittime=5 -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' + - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly,wal2json-decoder -Ddebezium.test.records.waittime=5 -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly,pgoutput-decoder,postgres-10 -Ddebezium.test.records.waittime=5 -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=10-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.mongo.server=4.0 -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 677611544..adf8a45a6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -96,6 +96,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio } try { + connection.setAutoCommit(false); + if (hasStartLsnStoredInContext) { // start streaming from the last recorded position in the offset final Long lsn = offsetContext.lastCompletelyProcessedLsn() != null ? offsetContext.lastCompletelyProcessedLsn() : offsetContext.lsn(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java index 5ad32ace1..4784eef85 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java @@ -405,6 +405,9 @@ private PostgresType loadType(Connection connection, PreparedStatement statement while (rs.next()) { PostgresType result = createTypeBuilderFromResultSet(connection, rs, typeInfo, sqlTypeMapper).build(); addType(result); + if (!connection.getAutoCommit()) { + connection.commit(); + } return result; } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 41e6cb7e3..d83912f5f 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -830,6 +830,43 @@ public void shouldRemoveWhiteSpaceChars() throws Exception { assertThat(sourceTable).isEqualTo("b"); } + @Test + @FixFor("DBZ-2118") + public void shouldCloseTxAfterTypeQuery() throws Exception { + String setupStmt = SETUP_TABLES_STMT; + + TestHelper.execute(setupStmt); + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s1") + .with(PostgresConnectorConfig.TABLE_WHITELIST, "s1.b"); + + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForSnapshotToBeCompleted(); + + TestHelper.execute("CREATE TABLE s1.b (pk SERIAL, aa isbn, PRIMARY KEY(pk));", "INSERT INTO s1.b (aa) VALUES ('978-0-393-04002-9')"); + SourceRecords actualRecords = consumeRecordsByTopic(1); + + List records = actualRecords.recordsForTopic(topicName("s1.b")); + assertThat(records.size()).isEqualTo(1); + + SourceRecord record = records.get(0); + VerifyRecord.isValidInsert(record, PK_FIELD, 1); + + try (final PostgresConnection connection = TestHelper.create()) { + try { + Awaitility.await() + .atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) + .until(() -> getActiveTransactions(connection).size() == 1); + } + catch (ConditionTimeoutException e) { + } + assertThat(getActiveTransactions(connection)).hasSize(1); + } + } + @Test @FixFor("DBZ-878") public void shouldReplaceInvalidTopicNameCharacters() throws Exception { @@ -1154,6 +1191,18 @@ private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLExc }); } + private List getActiveTransactions(PostgresConnection connection) throws SQLException { + return connection.queryAndMap( + "SELECT query FROM pg_stat_activity WHERE backend_xmin IS NOT NULL ORDER BY age(backend_xmin) DESC;", + rs -> { + final List ret = new ArrayList<>(); + while (rs.next()) { + ret.add(rs.getString(1)); + } + return ret; + }); + } + private void assertFieldAbsent(SourceRecord record, String fieldName) { Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER); try {