DBZ-7024 Move EmbeddedConfig into separate class

`EmbeddedConfig` needs to be shared with other implmentations of
`DebeziumEngine` as long as Debezium embedded depends on the Kafka
model.
This commit is contained in:
Vojtech Juranek 2023-11-20 16:23:15 +01:00 committed by Jiri Pechanec
parent 7f3210214c
commit 4452e3d095
11 changed files with 39 additions and 31 deletions

View File

@ -17,7 +17,7 @@
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>

View File

@ -26,7 +26,7 @@
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>

View File

@ -27,7 +27,7 @@
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>

View File

@ -17,7 +17,7 @@
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>

View File

@ -17,7 +17,7 @@
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>

View File

@ -27,7 +27,6 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.errors.RetriableException;
@ -55,7 +54,6 @@
import io.debezium.annotation.ThreadSafe; import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Instantiator; import io.debezium.config.Instantiator;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException; import io.debezium.engine.StopEngineException;
@ -359,7 +357,7 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc
Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS); Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS);
embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
workerConfig = new EmbeddedConfig(embeddedConfig); workerConfig = new EmbeddedWorkerConfig(embeddedConfig);
} }
/** /**
@ -1010,23 +1008,6 @@ private DelayStrategy delayStrategy(Configuration config) {
Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS))); Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS)));
} }
protected static class EmbeddedConfig extends WorkerConfig {
private static final ConfigDef CONFIG;
static {
ConfigDef config = baseConfigDef();
Field.group(config, "file", EmbeddedEngineConfig.OFFSET_STORAGE_FILE_FILENAME);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR);
CONFIG = config;
}
protected EmbeddedConfig(Map<String, String> props) {
super(CONFIG, props);
}
}
private class HandlerErrors { private class HandlerErrors {
private Throwable handlerError; private Throwable handlerError;
private Throwable retryError; private Throwable retryError;

View File

@ -0,0 +1,28 @@
package io.debezium.embedded;
import java.util.Map;
import io.debezium.config.Field;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.WorkerConfig;
/**
* Extension to Kafka's {@link WorkerConfig} with additions needed by {@link io.debezium.engine.DebeziumEngine}.
* Should be removed once {@link io.debezium.engine.DebeziumEngine} is independent on Kafka model (DBZ-6234).
*/
class EmbeddedWorkerConfig extends WorkerConfig {
private static final ConfigDef CONFIG;
static {
ConfigDef config = baseConfigDef();
Field.group(config, "file", EmbeddedEngineConfig.OFFSET_STORAGE_FILE_FILENAME);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR);
CONFIG = config;
}
protected EmbeddedWorkerConfig(Map<String, String> props) {
super(CONFIG, props);
}
}

View File

@ -70,7 +70,6 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Instantiator; import io.debezium.config.Instantiator;
import io.debezium.data.VerifyRecord; import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
import io.debezium.function.BooleanConsumer; import io.debezium.function.BooleanConsumer;
import io.debezium.junit.SkipTestRule; import io.debezium.junit.SkipTestRule;
@ -1156,7 +1155,7 @@ protected <T> Map<Map<String, T>, Map<String, Object>> readLastCommittedOffsets(
Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS); Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS);
embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
WorkerConfig workerConfig = new EmbeddedConfig(embeddedConfig); WorkerConfig workerConfig = new EmbeddedWorkerConfig(embeddedConfig);
FileOffsetBackingStore offsetStore = KafkaConnectUtil.fileOffsetBackingStore(); FileOffsetBackingStore offsetStore = KafkaConnectUtil.fileOffsetBackingStore();
offsetStore.configure(workerConfig); offsetStore.configure(workerConfig);
@ -1188,7 +1187,7 @@ protected void storeOffsets(Configuration config, Map<Map<String, ?>, Map<String
Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS); Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS);
embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
WorkerConfig workerConfig = new EmbeddedConfig(embeddedConfig); WorkerConfig workerConfig = new EmbeddedWorkerConfig(embeddedConfig);
FileOffsetBackingStore offsetStore = KafkaConnectUtil.fileOffsetBackingStore(); FileOffsetBackingStore offsetStore = KafkaConnectUtil.fileOffsetBackingStore();
offsetStore.configure(workerConfig); offsetStore.configure(workerConfig);

View File

@ -17,7 +17,7 @@
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>

View File

@ -17,7 +17,7 @@
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>

View File

@ -17,7 +17,7 @@
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>
<logger <logger
name="io.debezium.embedded.EmbeddedEngine$EmbeddedConfig" name="io.debezium.embedded.EmbeddedWorkerConfig"
level="warn" additivity="false"> level="warn" additivity="false">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</logger> </logger>