diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java index 4cd76f0a5..14b1a7b08 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java @@ -137,7 +137,8 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory { .withType(Type.STRING) .withWidth(Width.LONG) .withImportance(Importance.HIGH) - .withDescription("The class of the Debezium database connector"); + .withDescription("The class of the Debezium database connector") + .withNoValidation(); // Required for unified thread creation public static final Field INTERNAL_CONNECTOR_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.id") @@ -145,7 +146,8 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory { .withType(Type.STRING) .withWidth(Width.SHORT) .withImportance(Importance.HIGH) - .withDescription("The unique identifier of the Debezium connector"); + .withDescription("The unique identifier of the Debezium connector") + .withNoValidation(); public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, DatabaseHistory.NAME, RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID); diff --git a/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java b/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java index a02177bff..903bf91e2 100644 --- a/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java +++ b/debezium-core/src/test/java/io/debezium/relational/history/KafkaDatabaseHistoryTest.java @@ -16,7 +16,9 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.serialization.StringSerializer; +import org.fest.assertions.Assertions; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -330,4 +332,21 @@ public void differentiateStorageExistsFromHistoryExists() { assertTrue(history.exists()); assertTrue(history.storageExists()); } + + @Test + @FixFor("DBZ-2144") + public void shouldValidateMandatoryValues() { + Configuration config = Configuration.create() + .build(); + + final Map issues = config.validate(KafkaDatabaseHistory.ALL_FIELDS); + Assertions.assertThat(issues.keySet()).isEqualTo(Collect.unmodifiableSet( + "database.history.name", + "database.history.connector.class", + "database.history.kafka.topic", + "database.history.kafka.bootstrap.servers", + "database.history.kafka.recovery.poll.interval.ms", + "database.history.connector.id", + "database.history.kafka.recovery.attempts")); + } }