diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 7b1a13297..5c948538b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -15,6 +15,7 @@ import java.sql.SQLException; import java.time.Instant; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Random; @@ -1397,8 +1398,22 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { TestHelper.execute(INSERT_STMT); Testing.Print.enable(); - SourceRecords streamingRecords = consumeRecordsByTopic(2 + 2); - List streaming = streamingRecords.allRecordsInOrder().subList(1, 3); + + final List streaming = new ArrayList(); + Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> { + // Should be BEGIN + END in case of empty tx or BEGIN + data in case of our TX + final SourceRecords streamingRecords = consumeRecordsByTopic(2); + final SourceRecord second = streamingRecords.allRecordsInOrder().get(1); + if (!second.topic().endsWith(".transaction")) { + streaming.add(second); + return true; + } + return false; + }); + + // Should be DATA + END for the rest of TX + SourceRecords streamingRecords = consumeRecordsByTopic(2); + streaming.add(streamingRecords.allRecordsInOrder().get(0)); for (SourceRecord record : streaming) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, true);