DBZ-2240 Use 1.1 compatible API in tests
This commit is contained in:
parent
c2ed7b1230
commit
0899fe2180
@ -139,7 +139,7 @@ protected void awaitAssert(ThrowingRunnable assertion) {
|
||||
protected void assertRecordsCount(String topic, int count) {
|
||||
try (Consumer<String, String> consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) {
|
||||
consumer.subscribe(Collections.singleton(topic));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS).toMillis());
|
||||
consumer.seekToBeginning(consumer.assignment());
|
||||
assertThat(records.count()).withFailMessage("Expecting topic '%s' to have <%d> messages but it had <%d>.", topic, count, records.count()).isEqualTo(count);
|
||||
}
|
||||
@ -148,7 +148,7 @@ protected void assertRecordsCount(String topic, int count) {
|
||||
protected void assertMinimalRecordsCount(String topic, int count) {
|
||||
try (Consumer<String, String> consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) {
|
||||
consumer.subscribe(Collections.singleton(topic));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS).toMillis());
|
||||
consumer.seekToBeginning(consumer.assignment());
|
||||
assertThat(
|
||||
records.count()).withFailMessage("Expecting topic '%s' to have at least <%d> messages but it had <%d>.", topic, count, records.count())
|
||||
@ -160,7 +160,7 @@ protected void assertRecordsContain(String topic, String content) {
|
||||
try (Consumer<String, String> consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) {
|
||||
consumer.subscribe(Collections.singleton(topic));
|
||||
consumer.seekToBeginning(consumer.assignment());
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS).toMillis());
|
||||
long matchingCount = StreamSupport.stream(records.records(topic).spliterator(), false).filter(r -> r.value().contains(content)).count();
|
||||
assertThat(matchingCount).withFailMessage("Topic '%s' doesn't have message containing <%s>.", topic, content).isGreaterThan(0);
|
||||
|
||||
|
@ -184,7 +184,7 @@ private <T> List<ConsumerRecord<T, T>> drain(KafkaConsumer<T, T> consumer, int e
|
||||
List<ConsumerRecord<T, T>> allRecords = new ArrayList<>();
|
||||
|
||||
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
|
||||
consumer.poll(Duration.ofMillis(50))
|
||||
consumer.poll(Duration.ofMillis(50).toMillis())
|
||||
.iterator()
|
||||
.forEachRemaining(allRecords::add);
|
||||
|
||||
|
@ -144,7 +144,7 @@ private List<ConsumerRecord<String, String>> drain(KafkaConsumer<String, String>
|
||||
List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();
|
||||
|
||||
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
|
||||
consumer.poll(Duration.ofMillis(50))
|
||||
consumer.poll(Duration.ofMillis(50).toMillis())
|
||||
.iterator()
|
||||
.forEachRemaining(allRecords::add);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user