From a6b2f0db3c2ce4da0fc536c5b2a859afd0878855 Mon Sep 17 00:00:00 2001 From: ggaborg Date: Fri, 11 Nov 2022 12:53:47 +0200 Subject: [PATCH] DBZ-4720 Validate Debezium Server configuration properties --- .../simple/SimpleSourceConnector.java | 2 +- .../io/debezium/embedded/EmbeddedEngine.java | 16 +++++++++++- .../debezium/embedded/EmbeddedEngineTest.java | 26 +++++++++++++++++-- .../debezium/server/ConnectorLifecycle.java | 8 +++++- 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java b/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java index b078221e7..4ae418821 100644 --- a/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java +++ b/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java @@ -100,7 +100,7 @@ public void stop() { @Override public ConfigDef config() { - return null; + return new ConfigDef(); } public static class SimpleConnectorTask extends SourceTask { diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 12041f6d6..27e7fdd84 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -25,6 +25,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -34,8 +35,10 @@ import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceConnectorContext; @@ -707,6 +710,17 @@ public void run() { fail("Unable to instantiate connector class '" + connectorClassName + "'", t); return; } + Map connectorConfig = workerConfig.originalsStrings(); + Config validatedConnectorConfig = connector.validate(connectorConfig); + ConfigInfos configInfos = AbstractHerder.generateResult(connectorClassName, Collections.emptyMap(), validatedConnectorConfig.configValues(), + connector.config().groups()); + if (configInfos.errorCount() > 0) { + String errors = configInfos.values().stream() + .flatMap(v -> v.configValue().errors().stream()) + .collect(Collectors.joining(" ")); + fail("Connector configuration is not valid. " + errors); + return; + } // Instantiate the offset store ... final String offsetStoreClassName = config.getString(OFFSET_STORAGE); @@ -773,7 +787,7 @@ public OffsetStorageReader offsetStorageReader() { try { // Start the connector with the given properties and get the task configurations ... - connector.start(workerConfig.originalsStrings()); + connector.start(connectorConfig); connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted); List> taskConfigs = connector.taskConfigs(1); Class taskClass = connector.taskClass(); diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index 9101aec96..8ab2d633f 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; @@ -91,7 +92,7 @@ public SourceRecord apply(SourceRecord record) { @Override public ConfigDef config() { - return null; + return new ConfigDef(); } @Override @@ -102,7 +103,7 @@ public void close() { public static class FilterPredicate implements Predicate { @Override public ConfigDef config() { - return null; + return new ConfigDef(); } @Override @@ -704,6 +705,27 @@ public void verifyBadCommitPolicyClassName() { assertThat(exceptionCaught.get()).isTrue(); } + + @FixFor("DBZ-4720") + @Test + public void validationThrowsException() throws Exception { + // Add initial content to the file ... + appendLinesToSource(NUMBER_OF_LINES); + + // Start the connector ... + AtomicReference errorReference = new AtomicReference<>(); + start(FileStreamSourceConnector.class, Configuration.from(new Properties()), (success, message, error) -> { + if (message != null) { + errorReference.set(message); + } + }); + + assertNoRecordsToConsume(); + + assertThat(errorReference.get()).isNotNull(); + assertThat(errorReference.get()).contains("Connector configuration is not valid. "); + assertThat(this.engine.isRunning()).isFalse(); + } } class InterruptedConnector extends SimpleSourceConnector { diff --git a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/ConnectorLifecycle.java b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/ConnectorLifecycle.java index a349f18f9..c95df800c 100644 --- a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/ConnectorLifecycle.java +++ b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/ConnectorLifecycle.java @@ -79,7 +79,13 @@ public void taskStopped() { @Override public void handle(boolean success, String message, Throwable error) { - LOGGER.info("Connector completed: success = '{}', message = '{}', error = '{}'", success, message, error); + String logMessage = String.format("Connector completed: success = '%s', message = '%s', error = '%s'", success, message, error); + if (success) { + LOGGER.info(logMessage); + } + else { + LOGGER.error(logMessage); + } connectorCompletedEvent.fire(new ConnectorCompletedEvent(success, message, error)); live = false; }