DBZ-3608 Avoiding repeated closing actions

This commit is contained in:
Gunnar Morling 2021-06-09 12:32:06 +02:00 committed by Jiri Pechanec
parent 205c63a56a
commit a2f927c2db
2 changed files with 1 additions and 59 deletions

View File

@ -196,8 +196,6 @@ public void shouldNotStartWithInvalidSlotConfigAndUserRoles() throws Exception {
assertTrue("Unexpected error for \"" + configValue.name() + "\": " + configValue.errorMessages(), configValue.errorMessages().isEmpty());
}
});
stopConnector();
}
@Test
@ -340,10 +338,6 @@ public void shouldUseMicrosecondsForTransactionCommitTime() throws InterruptedEx
final long microsStream = TimeUnit.SECONDS.toMicros(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(inst.getNano());
actualRecords = consumeRecordsByTopic(2);
actualRecords.forEach(sourceRecord -> assertSourceInfoMicrosecondTransactionTimestamp(sourceRecord, microsStream, TimeUnit.MINUTES.toMicros(1L)));
// now stop the connector
stopConnector();
assertNoRecordsToConsume();
}
@Test
@ -368,10 +362,6 @@ public void shouldUseMillisecondsForTransactionCommitTime() throws InterruptedEx
final long millisStream = TimeUnit.SECONDS.toMillis(inst.getEpochSecond()) + TimeUnit.NANOSECONDS.toMillis(inst.getNano());
actualRecords = consumeRecordsByTopic(2);
actualRecords.forEach(sourceRecord -> assertSourceInfoMillisecondTransactionTimestamp(sourceRecord, millisStream, TimeUnit.MINUTES.toMillis(1L)));
// now stop the connector
stopConnector();
assertNoRecordsToConsume();
}
@Test
@ -394,8 +384,6 @@ public void shouldConsumeMessagesFromSnapshot() throws Exception {
waitForSnapshotToBeCompleted();
SourceRecords records = consumeRecordsByTopic(recordCount);
Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(recordCount);
stopConnector();
}
@Test
@ -417,8 +405,6 @@ public void shouldConsumeMessagesFromSnapshotOld() throws Exception {
waitForSnapshotToBeCompleted();
SourceRecords records = consumeRecordsByTopic(recordCount);
Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(recordCount);
stopConnector();
}
@Test
@ -826,9 +812,6 @@ public void shouldIgnoreEventsForDeletedTable() throws Exception {
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics()).hasSize(1);
assertThat(actualRecords.recordsForTopic(topicName("s2.a"))).hasSize(1);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -867,9 +850,6 @@ public void shouldNotIgnoreEventsForDeletedTable() throws Exception {
assertThat(actualRecords.topics()).hasSize(2);
assertThat(actualRecords.recordsForTopic(topicName("s1.a"))).hasSize(1);
assertThat(actualRecords.recordsForTopic(topicName("s2.a"))).hasSize(1);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -906,9 +886,6 @@ public void shouldIgnoreViews() throws Exception {
waitForStreamingRunning();
assertRecordsAfterInsert(2, 3, 3);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -931,9 +908,6 @@ public void shouldExecuteOnConnectStatements() throws Exception {
// so the given statements will be executed multiple times, resulting in multiple
// records; here we're interested just in the first insert for s2.a
assertValueField(actualRecords.allRecordsInOrder().get(5), "after/bb", "hello; world");
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -1005,9 +979,6 @@ public void shouldProduceEventsWhenAlwaysTakingSnapshots() throws InterruptedExc
waitForSnapshotToBeCompleted();
assertRecordsFromSnapshot(4, 1, 2, 1, 2);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -1052,9 +1023,6 @@ public void shouldResumeSnapshotIfFailingMidstream() throws Exception {
// insert and verify 2 new records
TestHelper.execute(INSERT_STMT);
assertRecordsAfterInsert(2, 3, 3);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -1079,9 +1047,6 @@ public void shouldRecoverFromRetriableException() throws Exception {
TestHelper.execute("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE backend_type='walsender'");
TestHelper.execute(INSERT_STMT);
assertRecordsAfterInsert(2, 2, 2);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -1281,8 +1246,6 @@ public void shouldCloseTxAfterTypeQuery() throws Exception {
Assertions.assertThat(isbn).isEqualTo("0-393-04002-X");
TestHelper.assertNoOpenTransactions();
stopConnector();
assertConnectorNotRunning();
}
@Test
@ -1695,9 +1658,6 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception {
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -1747,10 +1707,6 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exc
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
stopConnector();
TestHelper.dropPublication();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@ -2302,8 +2258,6 @@ public void shouldNotIssueWarningForNoMonitoredTablesAfterApplyingFilters() thro
start(PostgresConnector.class, config);
waitForStreamingRunning();
assertThat(logInterceptor.containsMessage(DatabaseSchema.NO_CAPTURED_DATA_COLLECTIONS_WARNING)).isFalse();
stopConnector();
}
@Test
@ -2322,7 +2276,6 @@ public void shouldClearDatabaseWarnings() throws Exception {
waitForSnapshotToBeCompleted();
Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords() * 6))
.until(() -> logInterceptor.containsMessage("Server-side message: 'Exiting startup callback'"));
stopConnector();
}
@Test
@ -2425,8 +2378,6 @@ record = recordsForTopicS2.remove(0);
if (value.getStruct("after") != null) {
assertThat(value.getStruct("after").getString("bb")).isEqualTo("*****");
}
stopConnector();
}
@Test
@ -2506,8 +2457,6 @@ record = recordsForTopicS2.remove(0);
if (value.getStruct("after") != null) {
assertThat(value.getStruct("after").getString("bb")).isEqualTo("b4d39ab0d198fb4cac8b2f023da74f670bcaf192dcc79b5d6361b7ae6b2fafdf");
}
stopConnector();
}
@Test
@ -2564,8 +2513,6 @@ record = recordsForTopicS2.remove(0);
if (value.getStruct("after") != null) {
assertThat(value.getStruct("after").getString("bb")).isEqualTo("hel");
}
stopConnector();
}
@Test
@ -2618,8 +2565,6 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, true);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server", true);
}
stopConnector();
}
@Test
@ -2738,8 +2683,6 @@ public void shouldProduceMessagesOnlyForConfiguredTables() throws Exception {
assertThat(s2recs).hasSize(1);
VerifyRecord.isValidInsert(s2recs.get(0), PK_FIELD, 2);
stopConnector();
}
@Test
@ -2885,7 +2828,6 @@ public void shouldHaveLastCommitLsn() throws InterruptedException {
// Assert the lsn of the second transaction is less than the third.
assertTrue(second_transaction_sequence.get(1) < third_transaction_sequence.get(1));
stopConnector();
}
private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) {

View File

@ -157,7 +157,7 @@ public void shouldLoadSchemaForMacaddr8PostgresType() throws Exception {
try (PostgresConnection connection = TestHelper.createWithTypeRegistry()) {
schema.refresh(connection, false);
assertTablesIncluded(tableId);
assertKeySchema(tableId, "pk", Schema.INT32_SCHEMA);
assertKeySchema(tableId, "pk", SchemaBuilder.int32().defaultValue(0).build());
assertTableSchema(tableId, "m", Schema.OPTIONAL_STRING_SCHEMA);
}
}