DBZ-4720 Validate Debezium Server configuration properties

This commit is contained in:
ggaborg 2022-11-11 12:53:47 +02:00 committed by Jiri Pechanec
parent d7afcd4fba
commit a6b2f0db3c
4 changed files with 47 additions and 5 deletions

View File

@ -100,7 +100,7 @@ public void stop() {
@Override
public ConfigDef config() {
return null;
return new ConfigDef();
}
public static class SimpleConnectorTask extends SourceTask {

View File

@ -25,6 +25,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@ -34,8 +35,10 @@
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
@ -707,6 +710,17 @@ public void run() {
fail("Unable to instantiate connector class '" + connectorClassName + "'", t);
return;
}
Map<String, String> connectorConfig = workerConfig.originalsStrings();
Config validatedConnectorConfig = connector.validate(connectorConfig);
ConfigInfos configInfos = AbstractHerder.generateResult(connectorClassName, Collections.emptyMap(), validatedConnectorConfig.configValues(),
connector.config().groups());
if (configInfos.errorCount() > 0) {
String errors = configInfos.values().stream()
.flatMap(v -> v.configValue().errors().stream())
.collect(Collectors.joining(" "));
fail("Connector configuration is not valid. " + errors);
return;
}
// Instantiate the offset store ...
final String offsetStoreClassName = config.getString(OFFSET_STORAGE);
@ -773,7 +787,7 @@ public OffsetStorageReader offsetStorageReader() {
try {
// Start the connector with the given properties and get the task configurations ...
connector.start(workerConfig.originalsStrings());
connector.start(connectorConfig);
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
Class<? extends Task> taskClass = connector.taskClass();

View File

@ -27,6 +27,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
@ -91,7 +92,7 @@ public SourceRecord apply(SourceRecord record) {
@Override
public ConfigDef config() {
return null;
return new ConfigDef();
}
@Override
@ -102,7 +103,7 @@ public void close() {
public static class FilterPredicate implements Predicate<SourceRecord> {
@Override
public ConfigDef config() {
return null;
return new ConfigDef();
}
@Override
@ -704,6 +705,27 @@ public void verifyBadCommitPolicyClassName() {
assertThat(exceptionCaught.get()).isTrue();
}
@FixFor("DBZ-4720")
@Test
public void validationThrowsException() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
// Start the connector ...
AtomicReference<String> errorReference = new AtomicReference<>();
start(FileStreamSourceConnector.class, Configuration.from(new Properties()), (success, message, error) -> {
if (message != null) {
errorReference.set(message);
}
});
assertNoRecordsToConsume();
assertThat(errorReference.get()).isNotNull();
assertThat(errorReference.get()).contains("Connector configuration is not valid. ");
assertThat(this.engine.isRunning()).isFalse();
}
}
class InterruptedConnector extends SimpleSourceConnector {

View File

@ -79,7 +79,13 @@ public void taskStopped() {
@Override
public void handle(boolean success, String message, Throwable error) {
LOGGER.info("Connector completed: success = '{}', message = '{}', error = '{}'", success, message, error);
String logMessage = String.format("Connector completed: success = '%s', message = '%s', error = '%s'", success, message, error);
if (success) {
LOGGER.info(logMessage);
}
else {
LOGGER.error(logMessage);
}
connectorCompletedEvent.fire(new ConnectorCompletedEvent(success, message, error));
live = false;
}