diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index b49eeb3fa..4cf70a186 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -98,6 +98,7 @@ Scofield Xu Sherafudheen PM Shubham Rawat Stanley Shyiko +Stathis Souris Stephen Powis Steven Siahetiong Syed Muhammad Sufyian diff --git a/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java index 8de959c90..67905fefa 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/AbstractDatabaseHistory.java @@ -30,6 +30,7 @@ public abstract class AbstractDatabaseHistory implements DatabaseHistory { protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected Configuration config; private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE; private boolean skipUnparseableDDL; diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java index f16181a25..57d92bdd3 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java @@ -127,7 +127,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory { private Configuration producerConfig; private volatile KafkaProducer producer; private int maxRecoveryAttempts; - private int pollIntervalMs = -1; + private Duration pollInterval; @Override public void configure(Configuration config, HistoryRecordComparator comparator) { @@ -136,7 +136,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator) throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); } this.topicName = config.getString(TOPIC); - this.pollIntervalMs = config.getInteger(RECOVERY_POLL_INTERVAL_MS); + this.pollInterval = Duration.ofMillis(config.getInteger(RECOVERY_POLL_INTERVAL_MS)); this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS); String bootstrapServers = config.getString(BOOTSTRAP_SERVERS); @@ -227,7 +227,7 @@ protected void recoverRecords(Consumer records) { endOffset = getEndOffsetOfDbHistoryTopic(endOffset, historyConsumer); logger.debug("End offset of database history topic is {}", endOffset); - ConsumerRecords recoveredRecords = historyConsumer.poll(this.pollIntervalMs); + ConsumerRecords recoveredRecords = historyConsumer.poll(this.pollInterval); int numRecordsProcessed = 0; for (ConsumerRecord record : recoveredRecords) {