DBZ-7007 Move embedded engine config into separate interface

This config will be re-used by possible other implementations of
DebeiumEngine API in the embedded package. As DebeziumEngine API
can have completely different implementations and thus also config,
the class is called `EmbeddedEngineConfig` as it's assumed to be used
only by embedded engine "family" of implementations.

To keep backward compatibility, the config options are extracted into
an interface and `EmbeddedEngine` implements this interface, thus
allowing to use these options in custom classes without any need for the
code changes.
This commit is contained in:
Vojtech Juranek 2023-10-06 13:56:04 +02:00 committed by Jiri Pechanec
parent e073dbd1be
commit 9d26dd4cf4
11 changed files with 252 additions and 232 deletions

View File

@ -90,7 +90,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
@ -2971,7 +2971,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")
.with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName())
.with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
.with(EmbeddedEngineConfig.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
Configuration config = builder.build();
consumeRecords(config);
@ -3004,7 +3004,7 @@ public void shouldCreateSnapshotSchemaOnlyExceptionWithoutHistory() throws Excep
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")
.with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName())
.with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
.with(EmbeddedEngineConfig.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
Configuration config = builder.build();
consumeRecords(config);
@ -3025,7 +3025,7 @@ public void shouldSkipDataOnSnapshotSchemaOnly() throws Exception {
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")
.with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName())
.with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class.getName());
.with(EmbeddedEngineConfig.OFFSET_STORAGE, MemoryOffsetBackingStore.class.getName());
Configuration config = builder.build();
consumeRecords(config);

View File

@ -82,7 +82,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.data.geometry.Point;
import io.debezium.doc.FixFor;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
@ -2323,7 +2323,7 @@ public void shouldStartConsumingFromSlotLocation() throws Exception {
startConnector(config -> config
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
.with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class), true);
.with(EmbeddedEngineConfig.OFFSET_STORAGE, MemoryOffsetBackingStore.class), true);
waitForStreamingToStart();
consumer = testConsumer(1);
@ -2337,7 +2337,7 @@ public void shouldStartConsumingFromSlotLocation() throws Exception {
startConnector(config -> config
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER)
.with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class), false);
.with(EmbeddedEngineConfig.OFFSET_STORAGE, MemoryOffsetBackingStore.class), false);
consumer.expects(3);
consumer.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS);

View File

@ -28,9 +28,6 @@
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.RetriableException;
@ -38,9 +35,7 @@
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.source.SourceRecord;
@ -65,7 +60,6 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.VariableLatch;
@ -88,174 +82,7 @@
* @author Randall Hauch
*/
@ThreadSafe
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
/**
* A required field for an embedded connector that specifies the unique name for the connector instance.
*/
public static final Field ENGINE_NAME = Field.create("name")
.withDescription("Unique name for this connector instance.")
.required();
/**
* A required field for an embedded connector that specifies the name of the normal Debezium connector's Java class.
*/
public static final Field CONNECTOR_CLASS = Field.create("connector.class")
.withDescription("The Java class for the connector")
.required();
/**
* An optional field that specifies the name of the class that implements the {@link OffsetBackingStore} interface,
* and that will be used to store offsets recorded by the connector.
*/
public static final Field OFFSET_STORAGE = Field.create("offset.storage")
.withDescription("The Java class that implements the `OffsetBackingStore` "
+ "interface, used to periodically store offsets so that, upon "
+ "restart, the connector can resume where it last left off.")
.withDefault(FileOffsetBackingStore.class.getName());
/**
* An optional field that specifies the file location for the {@link FileOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
public static final Field OFFSET_STORAGE_FILE_FILENAME = Field.create(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG)
.withDescription("The file where offsets are to be stored. Required when "
+ "'offset.storage' is set to the " +
FileOffsetBackingStore.class.getName() + " class.")
.withDefault("");
/**
* An optional field that specifies the topic name for the {@link KafkaOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
public static final Field OFFSET_STORAGE_KAFKA_TOPIC = Field.create(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG)
.withDescription("The name of the Kafka topic where offsets are to be stored. "
+ "Required with other properties when 'offset.storage' is set to the "
+ KafkaOffsetBackingStore.class.getName() + " class.")
.withDefault("");
/**
* An optional field that specifies the number of partitions for the {@link KafkaOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
public static final Field OFFSET_STORAGE_KAFKA_PARTITIONS = Field.create(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)
.withType(ConfigDef.Type.INT)
.withDescription("The number of partitions used when creating the offset storage topic. "
+ "Required with other properties when 'offset.storage' is set to the "
+ KafkaOffsetBackingStore.class.getName() + " class.");
/**
* An optional field that specifies the replication factor for the {@link KafkaOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
public static final Field OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR = Field.create(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)
.withType(ConfigDef.Type.SHORT)
.withDescription("Replication factor used when creating the offset storage topic. "
+ "Required with other properties when 'offset.storage' is set to the "
+ KafkaOffsetBackingStore.class.getName() + " class.");
/**
* An optional advanced field that specifies the maximum amount of time that the embedded connector should wait
* for an offset commit to complete.
*/
public static final Field OFFSET_FLUSH_INTERVAL_MS = Field.create("offset.flush.interval.ms")
.withDescription("Interval at which to try committing offsets, given in milliseconds. Defaults to 1 minute (60,000 ms).")
.withDefault(60000L)
.withValidation(Field::isNonNegativeInteger);
/**
* An optional advanced field that specifies the maximum amount of time that the embedded connector should wait
* for an offset commit to complete.
*/
public static final Field OFFSET_COMMIT_TIMEOUT_MS = Field.create("offset.flush.timeout.ms")
.withDescription("Time to wait for records to flush and partition offset data to be"
+ " committed to offset storage before cancelling the process and restoring the offset "
+ "data to be committed in a future attempt, given in milliseconds. Defaults to 5 seconds (5000 ms).")
.withDefault(5000L)
.withValidation(Field::isPositiveInteger);
public static final Field OFFSET_COMMIT_POLICY = Field.create("offset.commit.policy")
.withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface "
+ OffsetCommitPolicy.class.getName()
+ ". The default is a periodic commit policy based upon time intervals.")
.withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
.withValidation(Field::isClassName);
/**
* A list of Predicates that can be assigned to transformations.
*/
public static final Field PREDICATES = Field.create("predicates")
.withDisplayName("List of prefixes defining predicates.")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Optional list of predicates that can be assigned to transformations. "
+ "The predicates are defined using '<predicate.prefix>.type' config option and configured using options '<predicate.prefix>.<option>'");
/**
* A list of SMTs to be applied on the messages generated by the engine.
*/
public static final Field TRANSFORMS = Field.create("transforms")
.withDisplayName("List of prefixes defining transformations.")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Optional list of single message transformations applied on the messages. "
+ "The transforms are defined using '<transform.prefix>.type' config option and configured using options '<transform.prefix>.<option>'");
private static final int DEFAULT_ERROR_MAX_RETRIES = -1;
public static final Field ERRORS_MAX_RETRIES = Field.create("errors.max.retries")
.withDisplayName("The maximum number of retries")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(DEFAULT_ERROR_MAX_RETRIES)
.withValidation(Field::isInteger)
.withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
public static final Field ERRORS_RETRY_DELAY_INITIAL_MS = Field.create("errors.retry.delay.initial.ms")
.withDisplayName("Initial delay for retries")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(300)
.withValidation(Field::isPositiveInteger)
.withDescription("Initial delay (in ms) for retries when encountering connection errors."
+ " This value will be doubled upon every retry but won't exceed 'errors.retry.delay.max.ms'.");
public static final Field ERRORS_RETRY_DELAY_MAX_MS = Field.create("errors.retry.delay.max.ms")
.withDisplayName("Max delay between retries")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(10000)
.withValidation(Field::isPositiveInteger)
.withDescription("Max delay (in ms) between retries when encountering connection errors.");
public static final Field WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS = Field.create("debezium.embedded.shutdown.pause.before.interrupt.ms")
.withDisplayName("Time to wait to engine completion before interrupt")
.withType(Type.LONG)
.withDefault(Duration.ofMinutes(5).toMillis())
.withValidation(Field::isPositiveInteger)
.withDescription(String.format("How long we wait before forcefully stopping the connector thread when shutting down. " +
"Must be bigger than the time it takes two polling loops to finish ({} ms)", ChangeEventSourceCoordinator.SHUTDOWN_WAIT_TIMEOUT.toMillis() * 2));
/**
* The array of fields that are required by each connectors.
*/
public static final Field.Set CONNECTOR_FIELDS = Field.setOf(ENGINE_NAME, CONNECTOR_CLASS);
/**
* The array of all exposed fields.
*/
protected static final Field.Set ALL_FIELDS = CONNECTOR_FIELDS.with(OFFSET_STORAGE, OFFSET_STORAGE_FILE_FILENAME,
OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS,
ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS);
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>, EmbeddedEngineConfig {
public static final class BuilderImpl implements Builder {
private Configuration config;
@ -636,7 +463,7 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc
// Create the worker config, adding extra fields that are required for validation of a worker config
// but that are not used within the embedded engine (since the source records are never serialized) ...
Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);
Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS);
embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
workerConfig = new EmbeddedConfig(embeddedConfig);
@ -696,13 +523,13 @@ private void succeed(String msg) {
public void run() {
if (runningThread.compareAndSet(null, Thread.currentThread())) {
final String engineName = config.getString(ENGINE_NAME);
final String connectorClassName = config.getString(CONNECTOR_CLASS);
final String engineName = config.getString(EmbeddedEngineConfig.ENGINE_NAME);
final String connectorClassName = config.getString(EmbeddedEngineConfig.CONNECTOR_CLASS);
final Optional<DebeziumEngine.ConnectorCallback> connectorCallback = Optional.ofNullable(this.connectorCallback);
// Only one thread can be in this part of the method at a time ...
latch.countUp();
try {
if (!config.validateAndRecord(CONNECTOR_FIELDS, LOGGER::error)) {
if (!config.validateAndRecord(EmbeddedEngineConfig.CONNECTOR_FIELDS, LOGGER::error)) {
failAndThrow("Failed to start connector with invalid configuration (see logs for actual errors)", null);
}
@ -717,7 +544,7 @@ public void run() {
setOffsetCommitPolicy();
// Set up offset reader and writer
final Duration commitTimeout = Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));
final Duration commitTimeout = Duration.ofMillis(config.getLong(EmbeddedEngineConfig.OFFSET_COMMIT_TIMEOUT_MS));
final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, keyConverter, valueConverter);
final OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, keyConverter, valueConverter);
initializeConnector(connector, offsetReader);
@ -812,7 +639,7 @@ private Map<String, String> getConnectorConfig(final SourceConnector connector,
* Determines, which offset backing store should be used, instantiate it and start the offset store.
*/
private OffsetBackingStore initializeOffsetStore(final Map<String, String> connectorConfig) throws EmbeddedEngineRuntimeException {
final String offsetStoreClassName = config.getString(OFFSET_STORAGE);
final String offsetStoreClassName = config.getString(EmbeddedEngineConfig.OFFSET_STORAGE);
OffsetBackingStore offsetStore = null;
try {
// Kafka 3.5 no longer provides offset stores with non-parametric constructors
@ -852,11 +679,11 @@ else if (offsetStoreClassName.equals(KafkaOffsetBackingStore.class.getName())) {
private void setOffsetCommitPolicy() throws EmbeddedEngineRuntimeException {
if (offsetCommitPolicy == null) {
try {
offsetCommitPolicy = Instantiator.getInstanceWithProperties(config.getString(EmbeddedEngine.OFFSET_COMMIT_POLICY),
offsetCommitPolicy = Instantiator.getInstanceWithProperties(config.getString(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY),
config.asProperties());
}
catch (Throwable t) {
failAndThrow("Unable to instantiate OffsetCommitPolicy class '" + config.getString(OFFSET_STORAGE) + "'", t);
failAndThrow("Unable to instantiate OffsetCommitPolicy class '" + config.getString(EmbeddedEngineConfig.OFFSET_STORAGE) + "'", t);
}
}
}
@ -935,9 +762,9 @@ private Throwable handleRetries(final RetriableException e, final List<Map<Strin
if (maxRetries == 0) {
retryError = e;
}
else if (maxRetries < DEFAULT_ERROR_MAX_RETRIES) {
LOGGER.warn("Setting {}={} is deprecated. To disable retries on connection errors, set {}=0", ERRORS_MAX_RETRIES.name(), maxRetries,
ERRORS_MAX_RETRIES.name());
else if (maxRetries < EmbeddedEngineConfig.DEFAULT_ERROR_MAX_RETRIES) {
LOGGER.warn("Setting {}={} is deprecated. To disable retries on connection errors, set {}=0", EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name(), maxRetries,
EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name());
retryError = e;
}
else {
@ -1080,7 +907,7 @@ private void stopOffsetStoreAndConnector(final SourceConnector connector,
}
private int getErrorsMaxRetries() {
int maxRetries = config.getInteger(ERRORS_MAX_RETRIES);
int maxRetries = config.getInteger(EmbeddedEngineConfig.ERRORS_MAX_RETRIES);
return maxRetries;
}
@ -1241,7 +1068,7 @@ public boolean stop() {
if (thread != null) {
try {
// Making sure the event source coordinator has enough time to shut down before forcefully stopping it
Duration timeout = Duration.ofMillis(config.getLong(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS));
Duration timeout = Duration.ofMillis(config.getLong(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS));
LOGGER.info("Waiting for {} for connector to stop", timeout);
latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
@ -1277,7 +1104,7 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
@Override
public String toString() {
return "EmbeddedEngine{id=" + config.getString(ENGINE_NAME) + '}';
return "EmbeddedEngine{id=" + config.getString(EmbeddedEngineConfig.ENGINE_NAME) + '}';
}
public void runWithTask(Consumer<SourceTask> consumer) {
@ -1285,8 +1112,8 @@ public void runWithTask(Consumer<SourceTask> consumer) {
}
private DelayStrategy delayStrategy(Configuration config) {
return DelayStrategy.exponential(Duration.ofMillis(config.getInteger(ERRORS_RETRY_DELAY_INITIAL_MS)),
Duration.ofMillis(config.getInteger(ERRORS_RETRY_DELAY_MAX_MS)));
return DelayStrategy.exponential(Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_INITIAL_MS)),
Duration.ofMillis(config.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS)));
}
protected static class EmbeddedConfig extends WorkerConfig {
@ -1294,10 +1121,10 @@ protected static class EmbeddedConfig extends WorkerConfig {
static {
ConfigDef config = baseConfigDef();
Field.group(config, "file", OFFSET_STORAGE_FILE_FILENAME);
Field.group(config, "kafka", OFFSET_STORAGE_KAFKA_TOPIC);
Field.group(config, "kafka", OFFSET_STORAGE_KAFKA_PARTITIONS);
Field.group(config, "kafka", OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR);
Field.group(config, "file", EmbeddedEngineConfig.OFFSET_STORAGE_FILE_FILENAME);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS);
Field.group(config, "kafka", EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR);
CONFIG = config;
}

View File

@ -0,0 +1,192 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import java.time.Duration;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import io.debezium.config.Field;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
/**
* Common configuration options used in embedded implementations of {@link io.debezium.engine.DebeziumEngine}.
*/
public interface EmbeddedEngineConfig {
/**
* A required field for an embedded connector that specifies the unique name for the connector instance.
*/
Field ENGINE_NAME = Field.create("name")
.withDescription("Unique name for this connector instance.")
.required();
/**
* A required field for an embedded connector that specifies the name of the normal Debezium connector's Java class.
*/
Field CONNECTOR_CLASS = Field.create("connector.class")
.withDescription("The Java class for the connector")
.required();
/**
* The array of fields that are required by each connectors.
*/
Field.Set CONNECTOR_FIELDS = Field.setOf(ENGINE_NAME, CONNECTOR_CLASS);
/**
* An optional field that specifies the name of the class that implements the {@link OffsetBackingStore} interface,
* and that will be used to store offsets recorded by the connector.
*/
Field OFFSET_STORAGE = Field.create("offset.storage")
.withDescription("The Java class that implements the `OffsetBackingStore` "
+ "interface, used to periodically store offsets so that, upon "
+ "restart, the connector can resume where it last left off.")
.withDefault(FileOffsetBackingStore.class.getName());
/**
* An optional field that specifies the file location for the {@link FileOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
Field OFFSET_STORAGE_FILE_FILENAME = Field.create(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG)
.withDescription("The file where offsets are to be stored. Required when "
+ "'offset.storage' is set to the " +
FileOffsetBackingStore.class.getName() + " class.")
.withDefault("");
/**
* An optional field that specifies the topic name for the {@link KafkaOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
Field OFFSET_STORAGE_KAFKA_TOPIC = Field.create(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG)
.withDescription("The name of the Kafka topic where offsets are to be stored. "
+ "Required with other properties when 'offset.storage' is set to the "
+ KafkaOffsetBackingStore.class.getName() + " class.")
.withDefault("");
/**
* An optional field that specifies the number of partitions for the {@link KafkaOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
Field OFFSET_STORAGE_KAFKA_PARTITIONS = Field.create(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)
.withType(ConfigDef.Type.INT)
.withDescription("The number of partitions used when creating the offset storage topic. "
+ "Required with other properties when 'offset.storage' is set to the "
+ KafkaOffsetBackingStore.class.getName() + " class.");
/**
* An optional field that specifies the replication factor for the {@link KafkaOffsetBackingStore}.
*
* @see #OFFSET_STORAGE
*/
Field OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR = Field.create(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)
.withType(ConfigDef.Type.SHORT)
.withDescription("Replication factor used when creating the offset storage topic. "
+ "Required with other properties when 'offset.storage' is set to the "
+ KafkaOffsetBackingStore.class.getName() + " class.");
/**
* An optional advanced field that specifies the maximum amount of time that the embedded connector should wait
* for an offset commit to complete.
*/
Field OFFSET_FLUSH_INTERVAL_MS = Field.create("offset.flush.interval.ms")
.withDescription("Interval at which to try committing offsets, given in milliseconds. Defaults to 1 minute (60,000 ms).")
.withDefault(60000L)
.withValidation(Field::isNonNegativeInteger);
/**
* An optional advanced field that specifies the maximum amount of time that the embedded connector should wait
* for an offset commit to complete.
*/
Field OFFSET_COMMIT_TIMEOUT_MS = Field.create("offset.flush.timeout.ms")
.withDescription("Time to wait for records to flush and partition offset data to be"
+ " committed to offset storage before cancelling the process and restoring the offset "
+ "data to be committed in a future attempt, given in milliseconds. Defaults to 5 seconds (5000 ms).")
.withDefault(5000L)
.withValidation(Field::isPositiveInteger);
Field OFFSET_COMMIT_POLICY = Field.create("offset.commit.policy")
.withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface "
+ OffsetCommitPolicy.class.getName()
+ ". The default is a periodic commit policy based upon time intervals.")
.withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
.withValidation(Field::isClassName);
/**
* A list of Predicates that can be assigned to transformations.
*/
Field PREDICATES = Field.create("predicates")
.withDisplayName("List of prefixes defining predicates.")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Optional list of predicates that can be assigned to transformations. "
+ "The predicates are defined using '<predicate.prefix>.type' config option and configured using options '<predicate.prefix>.<option>'");
/**
* A list of SMTs to be applied on the messages generated by the engine.
*/
Field TRANSFORMS = Field.create("transforms")
.withDisplayName("List of prefixes defining transformations.")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Optional list of single message transformations applied on the messages. "
+ "The transforms are defined using '<transform.prefix>.type' config option and configured using options '<transform.prefix>.<option>'");
Field ERRORS_RETRY_DELAY_INITIAL_MS = Field.create("errors.retry.delay.initial.ms")
.withDisplayName("Initial delay for retries")
.withType(ConfigDef.Type.INT)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(300)
.withValidation(Field::isPositiveInteger)
.withDescription("Initial delay (in ms) for retries when encountering connection errors."
+ " This value will be doubled upon every retry but won't exceed 'errors.retry.delay.max.ms'.");
Field ERRORS_RETRY_DELAY_MAX_MS = Field.create("errors.retry.delay.max.ms")
.withDisplayName("Max delay between retries")
.withType(ConfigDef.Type.INT)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(10000)
.withValidation(Field::isPositiveInteger)
.withDescription("Max delay (in ms) between retries when encountering connection errors.");
Field WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS = Field.create("debezium.embedded.shutdown.pause.before.interrupt.ms")
.withDisplayName("Time to wait to engine completion before interrupt")
.withType(ConfigDef.Type.LONG)
.withDefault(Duration.ofMinutes(5).toMillis())
.withValidation(Field::isPositiveInteger)
.withDescription(String.format("How long we wait before forcefully stopping the connector thread when shutting down. " +
"Must be bigger than the time it takes two polling loops to finish ({} ms)", ChangeEventSourceCoordinator.SHUTDOWN_WAIT_TIMEOUT.toMillis() * 2));
int DEFAULT_ERROR_MAX_RETRIES = -1;
Field ERRORS_MAX_RETRIES = Field.create("errors.max.retries")
.withDisplayName("The maximum number of retries")
.withType(ConfigDef.Type.INT)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(DEFAULT_ERROR_MAX_RETRIES)
.withValidation(Field::isInteger)
.withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
/**
* The array of all exposed fields.
*/
Field.Set ALL_FIELDS = CONNECTOR_FIELDS.with(OFFSET_STORAGE, OFFSET_STORAGE_FILE_FILENAME,
OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS,
ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS);
}

View File

@ -32,7 +32,7 @@ public class Predicates implements Closeable {
private final Map<String, Predicate<SourceRecord>> predicates = new HashMap<>();
public Predicates(Configuration config) {
final String predicateList = config.getString(EmbeddedEngine.PREDICATES);
final String predicateList = config.getString(EmbeddedEngineConfig.PREDICATES);
if (predicateList == null) {
return;
}
@ -70,7 +70,7 @@ private static Predicate<SourceRecord> createPredicate(Configuration config, Str
}
private static String predicateConfigNamespace(final String name) {
return EmbeddedEngine.PREDICATES.name() + "." + name;
return EmbeddedEngineConfig.PREDICATES.name() + "." + name;
}
@Override

View File

@ -43,7 +43,7 @@ public class Transformations implements Closeable {
public Transformations(Configuration config) {
this.config = config;
this.predicates = new Predicates(config);
final String transformationList = config.getString(EmbeddedEngine.TRANSFORMS);
final String transformationList = config.getString(EmbeddedEngineConfig.TRANSFORMS);
if (transformationList == null) {
return;
}
@ -55,7 +55,7 @@ public Transformations(Configuration config) {
}
private static String transformationConfigNamespace(final String name) {
return EmbeddedEngine.TRANSFORMS.name() + "." + name;
return EmbeddedEngineConfig.TRANSFORMS.name() + "." + name;
}
@VisibleForTesting

View File

@ -28,7 +28,7 @@
import io.debezium.document.ArrayWriter;
import io.debezium.document.Document;
import io.debezium.embedded.ConnectorOutputTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
@ -66,7 +66,7 @@ public void shouldGenerateExpected() throws Exception {
config.put(SimpleSourceConnector.BATCH_COUNT, Integer.toString(numBatches));
config.put(SimpleSourceConnector.RECORD_COUNT_PER_BATCH, Integer.toString(numRecordsPerBatch));
config.put(SimpleSourceConnector.TOPIC_NAME, TOPIC_NAME);
config.put(EmbeddedEngine.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS.name(), String.valueOf(Duration.ofSeconds(1).toMillis()));
config.put(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS.name(), String.valueOf(Duration.ofSeconds(1).toMillis()));
writeConfigurationFileWithDefaultName(dir, config);
Properties env = new Properties();

View File

@ -360,10 +360,10 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
DebeziumEngine.CompletionCallback callback, Predicate<SourceRecord> isStopRecord,
Consumer<SourceRecord> recordArrivedListener, boolean ignoreRecordsAfterStop, DebeziumEngine.ChangeConsumer changeConsumer) {
Configuration config = Configuration.copy(connectorConfig)
.with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName())
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, connectorClass.getName())
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
latch = new CountDownLatch(1);
CompletionCallback wrapperCallback = (success, msg, error) -> {
@ -1148,12 +1148,12 @@ protected <T> Map<String, Object> readLastCommittedOffset(Configuration config,
*/
protected <T> Map<Map<String, T>, Map<String, Object>> readLastCommittedOffsets(Configuration config,
Collection<Map<String, T>> partitions) {
config = config.edit().with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
config = config.edit().with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
final String engineName = config.getString(EmbeddedEngine.ENGINE_NAME);
final String engineName = config.getString(EmbeddedEngineConfig.ENGINE_NAME);
Map<String, String> internalConverterConfig = Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
Converter keyConverter = Instantiator.getInstance(JsonConverter.class.getName());
keyConverter.configure(internalConverterConfig, true);
@ -1162,7 +1162,7 @@ protected <T> Map<Map<String, T>, Map<String, Object>> readLastCommittedOffsets(
// Create the worker config, adding extra fields that are required for validation of a worker config
// but that are not used within the embedded engine (since the source records are never serialized) ...
Map<String, String> embeddedConfig = config.asMap(EmbeddedEngine.ALL_FIELDS);
Map<String, String> embeddedConfig = config.asMap(EmbeddedEngineConfig.ALL_FIELDS);
embeddedConfig.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
embeddedConfig.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
WorkerConfig workerConfig = new EmbeddedConfig(embeddedConfig);

View File

@ -921,9 +921,9 @@ else if (isCommand(expected)) {
// the environment and engine parameters ...
Configuration engineConfig = Configuration.copy(connectorConfig)
.withDefault(environmentConfig)
.withDefault(EmbeddedEngine.ENGINE_NAME, spec.name())
.withDefault(EmbeddedEngineConfig.ENGINE_NAME, spec.name())
.withDefault(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.withDefault(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.withDefault(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
// Create the engine ...

View File

@ -192,11 +192,11 @@ public void verifyNonAsciiContentHandledCorrectly() throws Exception {
public void interruptedTaskShutsDown() throws Exception {
Configuration config = Configuration.create()
.with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, InterruptedConnector.class)
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, InterruptedConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngine.OFFSET_STORAGE, InterruptingOffsetStore.class)
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngineConfig.OFFSET_STORAGE, InterruptingOffsetStore.class)
.build();
CountDownLatch firstLatch = new CountDownLatch(1);
@ -229,10 +229,10 @@ public void interruptedOffsetCommitShutsDown() throws Exception {
Configuration config = Configuration.create()
.with(SimpleSourceConnector.BATCH_COUNT, 1)
.with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, SimpleSourceConnector.class)
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, SimpleSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngine.OFFSET_STORAGE, InterruptingOffsetStore.class)
.with(EmbeddedEngineConfig.OFFSET_STORAGE, InterruptingOffsetStore.class)
.build();
CountDownLatch firstLatch = new CountDownLatch(1);
@ -307,10 +307,10 @@ public void shouldWorkToUseCustomChangeConsumer() throws Exception {
appendLinesToSource(NUMBER_OF_LINES);
Configuration config = Configuration.copy(connectorConfig)
.with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, FileStreamSourceConnector.class)
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, FileStreamSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
CountDownLatch firstLatch = new CountDownLatch(1);
@ -734,10 +734,10 @@ public void shouldRunDebeziumEngineWithMismatchedTypes() throws Exception {
public void verifyBadCommitPolicyClassName() {
Configuration config = Configuration.create()
.with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, SimpleSourceConnector.class)
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, SimpleSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH)
.with(EmbeddedEngine.OFFSET_COMMIT_POLICY, "badclassname") // force ClassNotFoundException
.with(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY, "badclassname") // force ClassNotFoundException
.build();
final AtomicBoolean exceptionCaught = new AtomicBoolean(false);

View File

@ -52,6 +52,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningStrategy;
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.IoUtil;
@ -118,10 +119,10 @@ public void doSetup() {
.build();
Configuration config = Configuration.copy(connectorConfig)
.with(EmbeddedEngine.ENGINE_NAME, "benchmark")
.with(EmbeddedEngine.CONNECTOR_CLASS, OracleConnector.class)
.with(EmbeddedEngineConfig.ENGINE_NAME, "benchmark")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, OracleConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, getPath("offsets.txt").toAbsolutePath())
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
Consumer<SourceRecord> recordArrivedListener = this::processRecord;