DBZ-2118 Close TX after type query

This commit is contained in:
Jiri Pechanec 2020-06-03 15:05:54 +02:00 committed by Gunnar Morling
parent 9b33264334
commit 853ebafd7f
4 changed files with 56 additions and 2 deletions

View File

@ -6,8 +6,8 @@ cache:
env: env:
- MAVEN_CLI: '"clean install -B -pl debezium-connector-sqlserver -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-sqlserver -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mysql -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-mysql -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly -Ddebezium.test.records.waittime=5 -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly,wal2json-decoder -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly,wal2json-decoder -Ddebezium.test.records.waittime=5 -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=9.6-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly,pgoutput-decoder,postgres-10 -Ddebezium.test.records.waittime=5 -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=10-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-postgres -am -Passembly,pgoutput-decoder,postgres-10 -Ddebezium.test.records.waittime=5 -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.postgres.server=10-devel -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.mongo.server=4.0 -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"' - MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.mongo.server=4.0 -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'

View File

@ -96,6 +96,8 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
} }
try { try {
connection.setAutoCommit(false);
if (hasStartLsnStoredInContext) { if (hasStartLsnStoredInContext) {
// start streaming from the last recorded position in the offset // start streaming from the last recorded position in the offset
final Long lsn = offsetContext.lastCompletelyProcessedLsn() != null ? offsetContext.lastCompletelyProcessedLsn() : offsetContext.lsn(); final Long lsn = offsetContext.lastCompletelyProcessedLsn() != null ? offsetContext.lastCompletelyProcessedLsn() : offsetContext.lsn();

View File

@ -405,6 +405,9 @@ private PostgresType loadType(Connection connection, PreparedStatement statement
while (rs.next()) { while (rs.next()) {
PostgresType result = createTypeBuilderFromResultSet(connection, rs, typeInfo, sqlTypeMapper).build(); PostgresType result = createTypeBuilderFromResultSet(connection, rs, typeInfo, sqlTypeMapper).build();
addType(result); addType(result);
if (!connection.getAutoCommit()) {
connection.commit();
}
return result; return result;
} }
} }

View File

@ -830,6 +830,43 @@ public void shouldRemoveWhiteSpaceChars() throws Exception {
assertThat(sourceTable).isEqualTo("b"); assertThat(sourceTable).isEqualTo("b");
} }
@Test
@FixFor("DBZ-2118")
public void shouldCloseTxAfterTypeQuery() throws Exception {
String setupStmt = SETUP_TABLES_STMT;
TestHelper.execute(setupStmt);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s1")
.with(PostgresConnectorConfig.TABLE_WHITELIST, "s1.b");
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForSnapshotToBeCompleted();
TestHelper.execute("CREATE TABLE s1.b (pk SERIAL, aa isbn, PRIMARY KEY(pk));", "INSERT INTO s1.b (aa) VALUES ('978-0-393-04002-9')");
SourceRecords actualRecords = consumeRecordsByTopic(1);
List<SourceRecord> records = actualRecords.recordsForTopic(topicName("s1.b"));
assertThat(records.size()).isEqualTo(1);
SourceRecord record = records.get(0);
VerifyRecord.isValidInsert(record, PK_FIELD, 1);
try (final PostgresConnection connection = TestHelper.create()) {
try {
Awaitility.await()
.atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.until(() -> getActiveTransactions(connection).size() == 1);
}
catch (ConditionTimeoutException e) {
}
assertThat(getActiveTransactions(connection)).hasSize(1);
}
}
@Test @Test
@FixFor("DBZ-878") @FixFor("DBZ-878")
public void shouldReplaceInvalidTopicNameCharacters() throws Exception { public void shouldReplaceInvalidTopicNameCharacters() throws Exception {
@ -1154,6 +1191,18 @@ private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLExc
}); });
} }
private List<String> getActiveTransactions(PostgresConnection connection) throws SQLException {
return connection.queryAndMap(
"SELECT query FROM pg_stat_activity WHERE backend_xmin IS NOT NULL ORDER BY age(backend_xmin) DESC;",
rs -> {
final List<String> ret = new ArrayList<>();
while (rs.next()) {
ret.add(rs.getString(1));
}
return ret;
});
}
private void assertFieldAbsent(SourceRecord record, String fieldName) { private void assertFieldAbsent(SourceRecord record, String fieldName) {
Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER); Struct value = (Struct) ((Struct) record.value()).get(Envelope.FieldName.AFTER);
try { try {