From d513e2c757a28dfd6ac726ca7da5fc8c65b64613 Mon Sep 17 00:00:00 2001 From: ssouris Date: Mon, 3 Jun 2019 11:54:48 +0100 Subject: [PATCH] DBZ-1270 - Fix KafkaDatabaseHistory throwing even after creating the topic in older brokers --- .../history/KafkaDatabaseHistory.java | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java index b345fc2f3..d9973fff9 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.connect.errors.ConnectException; @@ -201,7 +202,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException @Override protected void recoverRecords(Consumer records) { - try (KafkaConsumer historyConsumer = new KafkaConsumer<>(consumerConfig.asProperties());) { + try (KafkaConsumer historyConsumer = new KafkaConsumer<>(consumerConfig.asProperties())) { // Subscribe to the only partition for this topic, and seek to the beginning of that partition ... logger.debug("Subscribing to database history topic '{}'", topicName); historyConsumer.subscribe(Collect.arrayListOf(topicName)); @@ -335,19 +336,9 @@ public void initializeStorage() { super.initializeStorage(); try (AdminClient admin = AdminClient.create(this.producerConfig.asProperties())) { + // Find default replication factor - Config brokerConfig = getKafkaBrokerConfig(admin); - String defaultReplicationFactorValue = brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value(); - final short replicationFactor; - // Ensure that the default replication factor property was returned by the Admin Client - if (defaultReplicationFactorValue != null) { - replicationFactor = Short.parseShort(defaultReplicationFactorValue); - } - else { - // Otherwise warn that no property was obtained and default it to 1 - users can increase this later if desired - logger.warn("Unable to obtain the default replication factor from the brokers at {} - Setting value to 1 instead", producerConfig.getString(BOOTSTRAP_SERVERS)); - replicationFactor = 1; - } + final short replicationFactor = getTopicReplicationFactor(admin); // Create topic final NewTopic topic = new NewTopic(topicName, (short) 1, replicationFactor); @@ -361,6 +352,32 @@ public void initializeStorage() { } } + private short getTopicReplicationFactor(AdminClient admin) throws Exception { + try { + Config brokerConfig = getKafkaBrokerConfig(admin); + String defaultReplicationFactorValue = brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value(); + + // Ensure that the default replication factor property was returned by the Admin Client + if (defaultReplicationFactorValue != null) { + return Short.parseShort(defaultReplicationFactorValue); + } + } + catch (ExecutionException ex) { + // ignore UnsupportedVersionException + logger.trace("Exception while getting the default replication factor", ex); + if (!(ex.getCause() instanceof UnsupportedVersionException)) { + throw ex; + } + + } + + // Otherwise warn that no property was obtained and default it to 1 - users can increase this later if desired + logger.warn( + "Unable to obtain the default replication factor from the brokers at {} - Setting value to 1 instead", + producerConfig.getString(BOOTSTRAP_SERVERS)); + return 1; + } + private Config getKafkaBrokerConfig(AdminClient admin) throws Exception { final Collection nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); if (nodes.isEmpty()) { @@ -379,4 +396,5 @@ private Config getKafkaBrokerConfig(AdminClient admin) throws Exception { return configs.values().iterator().next(); } + }