From 6205f1d6ea6e82ae895e0b066f475e5691e5c2a3 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 12 Dec 2019 13:09:11 +0100 Subject: [PATCH] DBZ-824 Assert serialization of every processed record --- .../io/debezium/embedded/AbstractConnectorTest.java | 10 +++++++--- .../java/io/debezium/embedded/EmbeddedEngineTest.java | 5 +++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index bc7531423..5a4e02846 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -385,10 +385,11 @@ protected int consumeRecords(int numberOfRecords) throws InterruptedException { * @param numberOfRecords the number of records that should be consumed * @param breakAfterNulls the number of allowed runs when no records are received * @param recordConsumer the function that should be called with each consumed record + * @param assertRecords true if records serialization should be verified * @return the actual number of records that were consumed * @throws InterruptedException if the thread was interrupted while waiting for a record to be returned */ - protected int consumeRecords(int numberOfRecords, int breakAfterNulls, Consumer recordConsumer) throws InterruptedException { + protected int consumeRecords(int numberOfRecords, int breakAfterNulls, Consumer recordConsumer, boolean assertRecords) throws InterruptedException { int recordsConsumed = 0; int nullReturn = 0; while (recordsConsumed < numberOfRecords) { @@ -409,6 +410,9 @@ else if (Testing.Print.isEnabled()) { + (numberOfRecords - recordsConsumed) + " more)"); print(record); } + if (assertRecords) { + VerifyRecord.isValid(record); + } } else { if (++nullReturn >= breakAfterNulls) { @@ -431,7 +435,7 @@ else if (Testing.Print.isEnabled()) { * @throws InterruptedException if the thread was interrupted while waiting for a record to be returned */ protected int consumeRecords(int numberOfRecords, Consumer recordConsumer) throws InterruptedException { - return consumeRecords(numberOfRecords, 3, recordConsumer); + return consumeRecords(numberOfRecords, 3, recordConsumer, true); } /** @@ -444,7 +448,7 @@ protected int consumeRecords(int numberOfRecords, Consumer recordC */ protected SourceRecords consumeRecordsByTopic(int numRecords, int breakAfterNulls) throws InterruptedException { SourceRecords records = new SourceRecords(); - consumeRecords(numRecords, breakAfterNulls, records::add); + consumeRecords(numRecords, breakAfterNulls, records::add, true); return records; } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index e19c40784..968e4bab8 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -163,10 +163,11 @@ protected String generateLine(int lineNumber) { } protected void consumeLines(int numberOfLines) throws InterruptedException { - consumeRecords(numberOfLines, record -> { + consumeRecords(numberOfLines, 3, record -> { String line = record.value().toString(); assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber)); ++nextConsumedLineNumber; - }); + }, + false); } }