diff --git a/debezium-connector-mongodb/src/test/resources/logback-test.xml b/debezium-connector-mongodb/src/test/resources/logback-test.xml index ec4e1853d..6edd9d89c 100644 --- a/debezium-connector-mongodb/src/test/resources/logback-test.xml +++ b/debezium-connector-mongodb/src/test/resources/logback-test.xml @@ -17,7 +17,7 @@ diff --git a/debezium-connector-mysql/src/test/resources/logback-test.xml b/debezium-connector-mysql/src/test/resources/logback-test.xml index 1be177401..93394588a 100644 --- a/debezium-connector-mysql/src/test/resources/logback-test.xml +++ b/debezium-connector-mysql/src/test/resources/logback-test.xml @@ -26,7 +26,7 @@ diff --git a/debezium-connector-oracle/src/test/resources/logback-test.xml b/debezium-connector-oracle/src/test/resources/logback-test.xml index ec074bca8..98c10eea8 100644 --- a/debezium-connector-oracle/src/test/resources/logback-test.xml +++ b/debezium-connector-oracle/src/test/resources/logback-test.xml @@ -27,7 +27,7 @@ diff --git a/debezium-connector-postgres/src/test/resources/logback-test.xml b/debezium-connector-postgres/src/test/resources/logback-test.xml index c2bf2d547..8085df4e4 100644 --- a/debezium-connector-postgres/src/test/resources/logback-test.xml +++ b/debezium-connector-postgres/src/test/resources/logback-test.xml @@ -17,7 +17,7 @@ diff --git a/debezium-connector-sqlserver/src/test/resources/logback-test.xml b/debezium-connector-sqlserver/src/test/resources/logback-test.xml index 67ff7103e..bfbb3e02e 100644 --- a/debezium-connector-sqlserver/src/test/resources/logback-test.xml +++ b/debezium-connector-sqlserver/src/test/resources/logback-test.xml @@ -17,7 +17,7 @@ diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index d3ebee04e..f74d3398b 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -27,7 +27,6 @@ import java.util.stream.Collectors; import org.apache.kafka.common.config.Config; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.RetriableException; @@ -55,7 +54,6 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; -import io.debezium.config.Field; import io.debezium.config.Instantiator; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.StopEngineException; @@ -359,7 +357,7 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc Map embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS); embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - workerConfig = new EmbeddedConfig(embeddedConfig); + workerConfig = new EmbeddedWorkerConfig(embeddedConfig); } /** @@ -1010,23 +1008,6 @@ private DelayStrategy delayStrategy(Configuration config) { Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS))); } - protected static class EmbeddedConfig extends WorkerConfig { - private static final ConfigDef CONFIG; - - static { - ConfigDef config = baseConfigDef(); - Field.group(config, "file", EmbeddedEngineConfig.OFFSET_STORAGE_FILE_FILENAME); - Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC); - Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS); - Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR); - CONFIG = config; - } - - protected EmbeddedConfig(Map props) { - super(CONFIG, props); - } - } - private class HandlerErrors { private Throwable handlerError; private Throwable retryError; diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedWorkerConfig.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedWorkerConfig.java new file mode 100644 index 000000000..ac593d130 --- /dev/null +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedWorkerConfig.java @@ -0,0 +1,28 @@ +package io.debezium.embedded; + +import java.util.Map; + +import io.debezium.config.Field; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.WorkerConfig; + +/** + * Extension to Kafka's {@link WorkerConfig} with additions needed by {@link io.debezium.engine.DebeziumEngine}. + * Should be removed once {@link io.debezium.engine.DebeziumEngine} is independent on Kafka model (DBZ-6234). + */ +class EmbeddedWorkerConfig extends WorkerConfig { + private static final ConfigDef CONFIG; + + static { + ConfigDef config = baseConfigDef(); + Field.group(config, "file", EmbeddedEngineConfig.OFFSET_STORAGE_FILE_FILENAME); + Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC); + Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS); + Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR); + CONFIG = config; + } + + protected EmbeddedWorkerConfig(Map props) { + super(CONFIG, props); + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 3f12c41da..f8f270767 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -70,7 +70,6 @@ import io.debezium.config.Configuration; import io.debezium.config.Instantiator; import io.debezium.data.VerifyRecord; -import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig; import io.debezium.engine.DebeziumEngine; import io.debezium.function.BooleanConsumer; import io.debezium.junit.SkipTestRule; @@ -1156,7 +1155,7 @@ protected Map, Map> readLastCommittedOffsets( Map embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS); embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - WorkerConfig workerConfig = new EmbeddedConfig(embeddedConfig); + WorkerConfig workerConfig = new EmbeddedWorkerConfig(embeddedConfig); FileOffsetBackingStore offsetStore = KafkaConnectUtil.fileOffsetBackingStore(); offsetStore.configure(workerConfig); @@ -1188,7 +1187,7 @@ protected void storeOffsets(Configuration config, Map, Map embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS); embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - WorkerConfig workerConfig = new EmbeddedConfig(embeddedConfig); + WorkerConfig workerConfig = new EmbeddedWorkerConfig(embeddedConfig); FileOffsetBackingStore offsetStore = KafkaConnectUtil.fileOffsetBackingStore(); offsetStore.configure(workerConfig); diff --git a/debezium-embedded/src/test/resources/logback-test.xml b/debezium-embedded/src/test/resources/logback-test.xml index 9a199d5f8..b20476126 100644 --- a/debezium-embedded/src/test/resources/logback-test.xml +++ b/debezium-embedded/src/test/resources/logback-test.xml @@ -17,7 +17,7 @@ diff --git a/debezium-microbenchmark-oracle/src/main/resources/logback-test.xml b/debezium-microbenchmark-oracle/src/main/resources/logback-test.xml index 9a199d5f8..b20476126 100644 --- a/debezium-microbenchmark-oracle/src/main/resources/logback-test.xml +++ b/debezium-microbenchmark-oracle/src/main/resources/logback-test.xml @@ -17,7 +17,7 @@ diff --git a/debezium-storage/debezium-storage-jdbc/src/test/resources/logback-test.xml b/debezium-storage/debezium-storage-jdbc/src/test/resources/logback-test.xml index a74cd6883..d7d4e38eb 100644 --- a/debezium-storage/debezium-storage-jdbc/src/test/resources/logback-test.xml +++ b/debezium-storage/debezium-storage-jdbc/src/test/resources/logback-test.xml @@ -17,7 +17,7 @@