DBZ-2140 Internal configs must not be validated

This commit is contained in:
Jiri Pechanec 2020-06-04 05:49:05 +02:00 committed by Gunnar Morling
parent 8bdc7e0a71
commit 86dac40dae
2 changed files with 23 additions and 2 deletions

View File

@ -137,7 +137,8 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("The class of the Debezium database connector");
.withDescription("The class of the Debezium database connector")
.withNoValidation();
// Required for unified thread creation
public static final Field INTERNAL_CONNECTOR_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.id")
@ -145,7 +146,8 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("The unique identifier of the Debezium connector");
.withDescription("The unique identifier of the Debezium connector")
.withNoValidation();
public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, DatabaseHistory.NAME,
RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID);

View File

@ -16,7 +16,9 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.serialization.StringSerializer;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -330,4 +332,21 @@ public void differentiateStorageExistsFromHistoryExists() {
assertTrue(history.exists());
assertTrue(history.storageExists());
}
@Test
@FixFor("DBZ-2144")
public void shouldValidateMandatoryValues() {
Configuration config = Configuration.create()
.build();
final Map<String, ConfigValue> issues = config.validate(KafkaDatabaseHistory.ALL_FIELDS);
Assertions.assertThat(issues.keySet()).isEqualTo(Collect.unmodifiableSet(
"database.history.name",
"database.history.connector.class",
"database.history.kafka.topic",
"database.history.kafka.bootstrap.servers",
"database.history.kafka.recovery.poll.interval.ms",
"database.history.connector.id",
"database.history.kafka.recovery.attempts"));
}
}