DBZ-824 Assert serialization of every processed record

This commit is contained in:
Jiri Pechanec 2019-12-12 13:09:11 +01:00 committed by Gunnar Morling
parent 52a2946aa5
commit 6205f1d6ea
2 changed files with 10 additions and 5 deletions

View File

@ -385,10 +385,11 @@ protected int consumeRecords(int numberOfRecords) throws InterruptedException {
* @param numberOfRecords the number of records that should be consumed
* @param breakAfterNulls the number of allowed runs when no records are received
* @param recordConsumer the function that should be called with each consumed record
* @param assertRecords true if records serialization should be verified
* @return the actual number of records that were consumed
* @throws InterruptedException if the thread was interrupted while waiting for a record to be returned
*/
protected int consumeRecords(int numberOfRecords, int breakAfterNulls, Consumer<SourceRecord> recordConsumer) throws InterruptedException {
protected int consumeRecords(int numberOfRecords, int breakAfterNulls, Consumer<SourceRecord> recordConsumer, boolean assertRecords) throws InterruptedException {
int recordsConsumed = 0;
int nullReturn = 0;
while (recordsConsumed < numberOfRecords) {
@ -409,6 +410,9 @@ else if (Testing.Print.isEnabled()) {
+ (numberOfRecords - recordsConsumed) + " more)");
print(record);
}
if (assertRecords) {
VerifyRecord.isValid(record);
}
}
else {
if (++nullReturn >= breakAfterNulls) {
@ -431,7 +435,7 @@ else if (Testing.Print.isEnabled()) {
* @throws InterruptedException if the thread was interrupted while waiting for a record to be returned
*/
protected int consumeRecords(int numberOfRecords, Consumer<SourceRecord> recordConsumer) throws InterruptedException {
return consumeRecords(numberOfRecords, 3, recordConsumer);
return consumeRecords(numberOfRecords, 3, recordConsumer, true);
}
/**
@ -444,7 +448,7 @@ protected int consumeRecords(int numberOfRecords, Consumer<SourceRecord> recordC
*/
protected SourceRecords consumeRecordsByTopic(int numRecords, int breakAfterNulls) throws InterruptedException {
SourceRecords records = new SourceRecords();
consumeRecords(numRecords, breakAfterNulls, records::add);
consumeRecords(numRecords, breakAfterNulls, records::add, true);
return records;
}

View File

@ -163,10 +163,11 @@ protected String generateLine(int lineNumber) {
}
protected void consumeLines(int numberOfLines) throws InterruptedException {
consumeRecords(numberOfLines, record -> {
consumeRecords(numberOfLines, 3, record -> {
String line = record.value().toString();
assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber));
++nextConsumedLineNumber;
});
},
false);
}
}