DBZ-5249 KafkaDatabaseHistory without check database history topic create result caused UnknowTopicOrPartitionException

This commit is contained in:
doupengwei 2022-07-21 23:24:23 +08:00 committed by Chris Cranford
parent b5adede223
commit eb34c2cdf7
3 changed files with 32 additions and 3 deletions

View File

@ -431,3 +431,4 @@ Snigdhajyoti Ghosh
Andrei Isac
Mark Allanson
Rahul Khanna
Pengwei Dou

View File

@ -24,6 +24,7 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
@ -42,6 +43,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@ -154,6 +156,16 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
.withDefault(Duration.ofSeconds(3).toMillis())
.withValidation(Field::isPositiveInteger);
public static final Field KAFKA_CREATE_TIMEOUT_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.create.timeout.ms")
.withDisplayName("Kafka admin client create timeout (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 33))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The number of milliseconds to wait while create kafka history topic using Kafka admin client.")
.withDefault(Duration.ofSeconds(30).toMillis())
.withValidation(Field::isPositiveInteger);
public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, DatabaseHistory.NAME,
RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID,
KAFKA_QUERY_TIMEOUT_MS);
@ -175,6 +187,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
private Duration pollInterval;
private ExecutorService checkTopicSettingsExecutor;
private Duration kafkaQueryTimeout;
private Duration kafkaCreateTimeout;
private static final boolean USE_KAFKA_24_NEW_TOPIC_CONSTRUCTOR = hasNewTopicConstructorWithOptionals();
@ -198,6 +211,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
this.pollInterval = Duration.ofMillis(config.getInteger(RECOVERY_POLL_INTERVAL_MS));
this.maxRecoveryAttempts = config.getInteger(RECOVERY_POLL_ATTEMPTS);
this.kafkaQueryTimeout = Duration.ofMillis(config.getLong(KAFKA_QUERY_TIMEOUT_MS));
this.kafkaCreateTimeout = Duration.ofMillis(config.getLong(KAFKA_CREATE_TIMEOUT_MS));
String bootstrapServers = config.getString(BOOTSTRAP_SERVERS);
// Copy the relevant portions of the configuration and add useful defaults ...
@ -532,9 +546,19 @@ public void initializeStorage() {
topic.configs(Collect.hashMapOf(CLEANUP_POLICY_NAME, CLEANUP_POLICY_VALUE, RETENTION_MS_NAME, Long.toString(RETENTION_MS_MAX), RETENTION_BYTES_NAME,
Long.toString(UNLIMITED_VALUE)));
admin.createTopics(Collections.singleton(topic));
LOGGER.info("Database history topic '{}' created", topic);
try {
CreateTopicsResult result = admin.createTopics(Collections.singleton(topic));
result.all().get(kafkaCreateTimeout.toMillis(), TimeUnit.MILLISECONDS);
LOGGER.info("Database history topic '{}' created", topic);
}
catch (ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) {
LOGGER.info("Database history topic '{}' already exist", topic);
}
else {
throw e;
}
}
}
catch (Exception e) {
throw new ConnectException("Creation of database history topic failed, please create the topic manually", e);

View File

@ -22,6 +22,10 @@ The following table describes the `database.history` properties for configuring
|`3000`
|An integer value that specifies the maximum number of milliseconds the connector should wait while fetching cluster information using Kafka admin client.
|[[{context}-property-database-history-kafka-create-timeout-ms]]<<{context}-property-database-history-kafka-create-timeout-ms, `+database.history.kafka.create.timeout.ms+`>>
|`30000`
|An integer value that specifies the maximum number of milliseconds the connector should wait while create kafka history topic using Kafka admin client.
|[[{context}-property-database-history-kafka-recovery-attempts]]<<{context}-property-database-history-kafka-recovery-attempts, `+database.history.kafka.recovery.attempts+`>>
|`4`
|The maximum number of times that the connector should try to read persisted history data before the connector recovery fails with an error. The maximum amount of time to wait after receiving no data is `recovery.attempts` x `recovery.poll.interval.ms`.