DBZ-173 Additional fixes to KafkaDatabaseHistory class for Kafka 0.10.1.0

The KafkaDatabaseHistory class was not behaving well in tests using my local development environment. When restoring from the persisted Kafka topic, the class would set up a Kafka consumer and see repeated messages. It is unclear whether the repeats were due to our test environment and very short poll timeouts. Regardless, the restore logic was refactored to track offsets so as to only process messages once.
This commit is contained in:
Randall Hauch 2017-02-01 14:47:41 -06:00
parent c380305e9b
commit 972cfbe2c4
3 changed files with 50 additions and 25 deletions

View File

@ -6,6 +6,9 @@
package io.debezium.relational.history;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -20,7 +23,6 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
@ -73,7 +75,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
.withImportance(Importance.LOW)
.withDescription("The number of milliseconds to wait while polling for persisted data during recovery.")
.withDefault(100)
.withValidation(Field::isInteger);
.withValidation(Field::isNonNegativeInteger);
public static final Field RECOVERY_POLL_ATTEMPTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.recovery.attempts")
.withDisplayName("Max attempts to recovery database history")
@ -142,7 +144,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator)
}
@Override
public void start() {
public synchronized void start() {
super.start();
if (this.producer == null) {
this.producer = new KafkaProducer<>(this.producerConfig.asProperties());
@ -178,36 +180,47 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
protected void recoverRecords(Tables schema, DdlParser ddlParser, Consumer<HistoryRecord> records) {
try (KafkaConsumer<String, String> historyConsumer = new KafkaConsumer<>(consumerConfig.asProperties());) {
// Subscribe to the only partition for this topic, and seek to the beginning of that partition ...
TopicPartition topicPartition = new TopicPartition(topicName, partition);
logger.debug("Subscribing to database history topic '{}' partition {} at offset 0", topicPartition.topic(),
topicPartition.partition());
logger.debug("Subscribing to database history topic '{}'", topicName);
historyConsumer.subscribe(Collect.arrayListOf(topicName));
// Explicitly seek to the beginning of all assigned topic partitions ...
logger.debug("Seeking Kafka consumer to beginning of all assigned topic partitions");
historyConsumer.seekToBeginning(Collections.emptyList());
// Read all messages in the topic ...
int remainingEmptyPollResults = this.recoveryAttempts;
Map<Long, Long> offsetsByPartition = new HashMap<>();
while (remainingEmptyPollResults > 0) {
ConsumerRecords<String, String> recoveredRecords = historyConsumer.poll(this.pollIntervalMs);
logger.debug("Read {} records from database history", recoveredRecords.count());
if (recoveredRecords.isEmpty()) {
--remainingEmptyPollResults;
} else {
remainingEmptyPollResults = this.recoveryAttempts;
for (ConsumerRecord<String, String> record : recoveredRecords) {
try {
int numRecordsProcessed = 0;
for (ConsumerRecord<String, String> record : recoveredRecords) {
try {
Long partition = new Long(record.partition());
Long lastOffset = offsetsByPartition.get(partition);
if (lastOffset == null || lastOffset.longValue() < record.offset()) {
HistoryRecord recordObj = new HistoryRecord(reader.read(record.value()));
records.accept(recordObj);
logger.trace("Recovered database history: {}" + recordObj);
} catch (IOException e) {
logger.error("Error while deserializing history record", e);
offsetsByPartition.put(partition, new Long(record.offset()));
++numRecordsProcessed;
}
} catch (IOException e) {
logger.error("Error while deserializing history record", e);
}
}
if (numRecordsProcessed == 0) {
--remainingEmptyPollResults;
logger.debug("No new records found in the database history; will retry {} more times", remainingEmptyPollResults);
} else {
remainingEmptyPollResults = this.recoveryAttempts;
logger.debug("Processed {} records from database history", numRecordsProcessed);
}
}
}
}
@Override
public void stop() {
public synchronized void stop() {
try {
if (this.producer != null) {
try {

View File

@ -5,15 +5,16 @@
*/
package io.debezium.relational.history;
import static org.fest.assertions.Assertions.assertThat;
import java.io.File;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.kafka.KafkaCluster;
import io.debezium.relational.Tables;
@ -26,7 +27,7 @@
* @author Randall Hauch
*/
public class KafkaDatabaseHistoryTest {
private KafkaDatabaseHistory history;
private KafkaCluster kafka;
private Map<String, String> source;
@ -39,10 +40,11 @@ public void beforeEach() throws Exception {
source = Collect.hashMapOf("server", "my-server");
setLogPosition(0);
topicName = "schema-changes-topic";
File dataDir = Testing.Files.createTestingDirectory("history_cluster");
Testing.Files.delete(dataDir);
// Configure the extra properties to
kafka = new KafkaCluster().usingDirectory(dataDir)
.deleteDataPriorToStartup(true)
.deleteDataUponShutdown(true)
@ -75,14 +77,20 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList())
.with(KafkaDatabaseHistory.TOPIC, topicName)
.with(DatabaseHistory.NAME, "my-db-history")
// new since 0.10.1.0 - we want a low value because we're running everything locally in this test
.with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 1000)
// new since 0.10.1.0 - we want a low value because we're running everything locally
// in this test. However, it can't be so low that the broker returns the same
// messages more than once.
.with(KafkaDatabaseHistory.consumerConfigPropertyName(
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
100)
.with(KafkaDatabaseHistory.consumerConfigPropertyName(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
50000)
.build();
history.configure(config, null);
history.start();
// Should be able to call start more than once ...
history.start();
@ -106,6 +114,8 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
history.record(source, position, "db1", tables1, ddl);
// Parse the DDL statement 3x and each time update a different Tables object ...
ddlParser.parse(ddl, tables1);
assertThat(tables1.size()).isEqualTo(3);
ddlParser.parse(ddl, tables2);
@ -113,6 +123,7 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc
ddlParser.parse(ddl, tables3);
assertThat(tables3.size()).isEqualTo(3);
// Record a drop statement and parse it for 2 of our 3 Tables...
setLogPosition(39);
ddl = "DROP TABLE foo;";
history.record(source, position, "db1", tables2, ddl);
@ -121,6 +132,7 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc
ddlParser.parse(ddl, tables3);
assertThat(tables3.size()).isEqualTo(2);
// Record another DDL statement and parse it for 1 of our 3 Tables...
setLogPosition(10003);
ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);";
history.record(source, position, "db1", tables3, ddl);

View File

@ -62,7 +62,7 @@
<version.postgresql.driver>42.0.0-SNAPSHOT</version.postgresql.driver>
<version.mysql.server>5.7</version.mysql.server>
<version.mysql.driver>5.1.40</version.mysql.driver>
<version.mysql.binlog>0.5.1</version.mysql.binlog>
<version.mysql.binlog>0.8.0</version.mysql.binlog>
<version.mongo.server>3.2.6</version.mongo.server>
<version.mongo.driver>3.2.2</version.mongo.driver>