DBZ-1783 Ignore empty transactions
This commit is contained in:
parent
40506d5352
commit
d4d38dcbec
@ -15,6 +15,7 @@
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
@ -1397,8 +1398,22 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
|
||||
TestHelper.execute(INSERT_STMT);
|
||||
|
||||
Testing.Print.enable();
|
||||
SourceRecords streamingRecords = consumeRecordsByTopic(2 + 2);
|
||||
List<SourceRecord> streaming = streamingRecords.allRecordsInOrder().subList(1, 3);
|
||||
|
||||
final List<SourceRecord> streaming = new ArrayList<SourceRecord>();
|
||||
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
|
||||
// Should be BEGIN + END in case of empty tx or BEGIN + data in case of our TX
|
||||
final SourceRecords streamingRecords = consumeRecordsByTopic(2);
|
||||
final SourceRecord second = streamingRecords.allRecordsInOrder().get(1);
|
||||
if (!second.topic().endsWith(".transaction")) {
|
||||
streaming.add(second);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
// Should be DATA + END for the rest of TX
|
||||
SourceRecords streamingRecords = consumeRecordsByTopic(2);
|
||||
streaming.add(streamingRecords.allRecordsInOrder().get(0));
|
||||
|
||||
for (SourceRecord record : streaming) {
|
||||
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, true);
|
||||
|
Loading…
Reference in New Issue
Block a user