DBZ-1270 Avoiding usage of deprecated method; adding Stathis Souris to authors

This commit is contained in:
Gunnar Morling 2019-06-14 10:44:29 +02:00 committed by Jiri Pechanec
parent bb360e472c
commit ee87f40f0c
3 changed files with 5 additions and 3 deletions

View File

@ -98,6 +98,7 @@ Scofield Xu
Sherafudheen PM
Shubham Rawat
Stanley Shyiko
Stathis Souris
Stephen Powis
Steven Siahetiong
Syed Muhammad Sufyian

View File

@ -30,6 +30,7 @@
public abstract class AbstractDatabaseHistory implements DatabaseHistory {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected Configuration config;
private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE;
private boolean skipUnparseableDDL;

View File

@ -127,7 +127,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
private Configuration producerConfig;
private volatile KafkaProducer<String, String> producer;
private int maxRecoveryAttempts;
private int pollIntervalMs = -1;
private Duration pollInterval;
@Override
public void configure(Configuration config, HistoryRecordComparator comparator) {
@ -136,7 +136,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator)
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
this.topicName = config.getString(TOPIC);
this.pollIntervalMs = config.getInteger(RECOVERY_POLL_INTERVAL_MS);
this.pollInterval = Duration.ofMillis(config.getInteger(RECOVERY_POLL_INTERVAL_MS));
this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
String bootstrapServers = config.getString(BOOTSTRAP_SERVERS);
@ -227,7 +227,7 @@ protected void recoverRecords(Consumer<HistoryRecord> records) {
endOffset = getEndOffsetOfDbHistoryTopic(endOffset, historyConsumer);
logger.debug("End offset of database history topic is {}", endOffset);
ConsumerRecords<String, String> recoveredRecords = historyConsumer.poll(this.pollIntervalMs);
ConsumerRecords<String, String> recoveredRecords = historyConsumer.poll(this.pollInterval);
int numRecordsProcessed = 0;
for (ConsumerRecord<String, String> record : recoveredRecords) {