DBZ-1270 - Fix KafkaDatabaseHistory throwing even after creating the topic in older brokers

This commit is contained in:
ssouris 2019-06-03 11:54:48 +01:00 committed by Jiri Pechanec
parent 9b5640ddde
commit d513e2c757

View File

@ -35,6 +35,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.errors.ConnectException;
@ -201,7 +202,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
try (KafkaConsumer<String, String> historyConsumer = new KafkaConsumer<>(consumerConfig.asProperties());) {
try (KafkaConsumer<String, String> historyConsumer = new KafkaConsumer<>(consumerConfig.asProperties())) {
// Subscribe to the only partition for this topic, and seek to the beginning of that partition ...
logger.debug("Subscribing to database history topic '{}'", topicName);
historyConsumer.subscribe(Collect.arrayListOf(topicName));
@ -335,19 +336,9 @@ public void initializeStorage() {
super.initializeStorage();
try (AdminClient admin = AdminClient.create(this.producerConfig.asProperties())) {
// Find default replication factor
Config brokerConfig = getKafkaBrokerConfig(admin);
String defaultReplicationFactorValue = brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value();
final short replicationFactor;
// Ensure that the default replication factor property was returned by the Admin Client
if (defaultReplicationFactorValue != null) {
replicationFactor = Short.parseShort(defaultReplicationFactorValue);
}
else {
// Otherwise warn that no property was obtained and default it to 1 - users can increase this later if desired
logger.warn("Unable to obtain the default replication factor from the brokers at {} - Setting value to 1 instead", producerConfig.getString(BOOTSTRAP_SERVERS));
replicationFactor = 1;
}
final short replicationFactor = getTopicReplicationFactor(admin);
// Create topic
final NewTopic topic = new NewTopic(topicName, (short) 1, replicationFactor);
@ -361,6 +352,32 @@ public void initializeStorage() {
}
}
private short getTopicReplicationFactor(AdminClient admin) throws Exception {
try {
Config brokerConfig = getKafkaBrokerConfig(admin);
String defaultReplicationFactorValue = brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value();
// Ensure that the default replication factor property was returned by the Admin Client
if (defaultReplicationFactorValue != null) {
return Short.parseShort(defaultReplicationFactorValue);
}
}
catch (ExecutionException ex) {
// ignore UnsupportedVersionException
logger.trace("Exception while getting the default replication factor", ex);
if (!(ex.getCause() instanceof UnsupportedVersionException)) {
throw ex;
}
}
// Otherwise warn that no property was obtained and default it to 1 - users can increase this later if desired
logger.warn(
"Unable to obtain the default replication factor from the brokers at {} - Setting value to 1 instead",
producerConfig.getString(BOOTSTRAP_SERVERS));
return 1;
}
private Config getKafkaBrokerConfig(AdminClient admin) throws Exception {
final Collection<Node> nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
if (nodes.isEmpty()) {
@ -379,4 +396,5 @@ private Config getKafkaBrokerConfig(AdminClient admin) throws Exception {
return configs.values().iterator().next();
}
}