diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 644219f99..c6b252e53 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -46,9 +46,8 @@ import io.debezium.heartbeat.HeartbeatImpl; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.notification.channels.SinkNotificationChannel; -import io.debezium.pipeline.txmetadata.DefaultTransactionStructMaker; -import io.debezium.pipeline.txmetadata.TransactionContext; -import io.debezium.pipeline.txmetadata.TransactionStructMaker; +import io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory; +import io.debezium.pipeline.txmetadata.spi.TransactionMetadataFactory; import io.debezium.relational.CustomConverterRegistry; import io.debezium.relational.TableId; import io.debezium.schema.SchemaNameAdjuster; @@ -719,23 +718,14 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { .withDescription("Enables transaction metadata extraction together with event counting") .withDefault(Boolean.FALSE); - public static final Field TRANSACTION_STRUCT_MAKER = Field.create("transaction.struct.maker") - .withDisplayName("Make transaction struct & schema") + public static final Field TRANSACTION_METADATA_FACTORY = Field.create("transaction.metadata.factory") + .withDisplayName("Factory class to create transaction context & transaction struct maker classes") .withType(Type.CLASS) .withWidth(Width.MEDIUM) .withImportance(Importance.LOW) - .withDefault(DefaultTransactionStructMaker.class.getName()) + .withDefault(DefaultTransactionMetadataFactory.class.getName()) .withDescription( - "Class to make transaction struct & schema"); - - public static final Field TRANSACTION_CONTEXT = Field.create("transaction.context") - .withDisplayName("Provides metadata on transactions") - .withType(Type.CLASS) - .withWidth(Width.MEDIUM) - .withImportance(ConfigDef.Importance.LOW) - .withDefault(TransactionContext.class.getName()) - .withDescription( - "Class to provide transaction metadata"); + "Class to make transaction context & transaction struct/schemas"); public static final Field EVENT_PROCESSING_FAILURE_HANDLING_MODE = Field.create("event.processing.failure.handling.mode") .withDisplayName("Event deserialization failure handling") @@ -1111,8 +1101,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { TOPIC_NAMING_STRATEGY, NOTIFICATION_ENABLED_CHANNELS, SinkNotificationChannel.NOTIFICATION_TOPIC, - TRANSACTION_CONTEXT, - TRANSACTION_STRUCT_MAKER, + TRANSACTION_METADATA_FACTORY, CUSTOM_METRIC_TAGS) .create(); @@ -1135,8 +1124,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { private final String snapshotModeCustomName; private final Integer queryFetchSize; private final SourceInfoStructMaker sourceInfoStructMaker; - private final TransactionContext transactionContext; - private final TransactionStructMaker transactionStructMaker; + private final TransactionMetadataFactory transactionMetadataFactory; private final boolean shouldProvideTransactionMetadata; private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode; private final CustomConverterRegistry customConverterRegistry; @@ -1187,8 +1175,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi this.fieldNameAdjustmentMode = FieldNameAdjustmentMode.parse(config.getString(FIELD_NAME_ADJUSTMENT_MODE)); this.eventConvertingFailureHandlingMode = EventConvertingFailureHandlingMode.parse(config.getString(EVENT_CONVERTING_FAILURE_HANDLING_MODE)); this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.V2); - this.transactionContext = getTransactionContext(); - this.transactionStructMaker = getTransactionStructMaker(); + this.transactionMetadataFactory = getTransactionMetadataFactory(); this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA); this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE)); this.customConverterRegistry = new CustomConverterRegistry(getCustomConverters()); @@ -1404,12 +1391,8 @@ public SourceInfoStructMaker getSourceInfoStru return (SourceInfoStructMaker) sourceInfoStructMaker; } - public TransactionContext getTransactionContext() { - return getTransactionContext(TRANSACTION_CONTEXT); - } - - public TransactionStructMaker getTransactionStructMaker() { - return getTransactionStructMaker(TRANSACTION_STRUCT_MAKER); + public TransactionMetadataFactory getTransactionMetadataFactory() { + return getTransactionMetadataFactory(TRANSACTION_METADATA_FACTORY); } public EnumSet getSkippedOperations() { @@ -1687,20 +1670,11 @@ public SourceInfoStructMaker getSourceInfoStru return sourceInfoStructMaker; } - public TransactionStructMaker getTransactionStructMaker(Field transactionStructMakerField) { - final TransactionStructMaker transactionStructMaker; - transactionStructMaker = config.getInstance(transactionStructMakerField, TransactionStructMaker.class, config); - if (transactionStructMaker == null) { - throw new DebeziumException("Unable to instantiate the transaction struct maker class " + TRANSACTION_STRUCT_MAKER); + public TransactionMetadataFactory getTransactionMetadataFactory(Field transactionMetadataFactoryField) { + final TransactionMetadataFactory factory = config.getInstance(transactionMetadataFactoryField, TransactionMetadataFactory.class, config); + if (factory == null) { + throw new DebeziumException("Unable to instantiate the transaction struct maker class " + TRANSACTION_METADATA_FACTORY); } - return transactionStructMaker; - } - - public TransactionContext getTransactionContext(Field transactionContextField) { - final TransactionContext transactionContext = config.getInstance(transactionContextField, TransactionContext.class); - if (transactionContext == null) { - throw new DebeziumException("Unable to instantiate the transaction context class " + transactionContextField); - } - return transactionContext; + return factory; } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/DefaultTransactionMetadataFactory.java b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/DefaultTransactionMetadataFactory.java new file mode 100644 index 000000000..724403f0b --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/DefaultTransactionMetadataFactory.java @@ -0,0 +1,28 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.txmetadata; + +import io.debezium.config.Configuration; +import io.debezium.pipeline.txmetadata.spi.TransactionMetadataFactory; + +public class DefaultTransactionMetadataFactory implements TransactionMetadataFactory { + + private final Configuration configuration; + + public DefaultTransactionMetadataFactory(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public TransactionContext getTransactionContext() { + return new TransactionContext(); + } + + @Override + public TransactionStructMaker getTransactionStructMaker() { + return new DefaultTransactionStructMaker(configuration); + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java index 04d231829..7bcce13ee 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/TransactionMonitor.java @@ -61,7 +61,7 @@ public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataPr String topicName) { Objects.requireNonNull(eventMetadataProvider); - transactionStructMaker = connectorConfig.getTransactionStructMaker(); + transactionStructMaker = connectorConfig.getTransactionMetadataFactory().getTransactionStructMaker(); transactionKeySchema = transactionStructMaker.getTransactionKeySchema(); this.topicName = topicName; diff --git a/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/spi/TransactionMetadataFactory.java b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/spi/TransactionMetadataFactory.java new file mode 100644 index 000000000..a78c8342a --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/txmetadata/spi/TransactionMetadataFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.txmetadata.spi; + +import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.pipeline.txmetadata.TransactionStructMaker; + +public interface TransactionMetadataFactory { + /** + * Provide a {@link TransactionContext} that is used for tracking/processing transaction metadata + * + * @return + */ + TransactionContext getTransactionContext(); + + /** + * Provide a {@link TransactionStructMaker} that is used to build the structs that stores transaction metadata + * + * @return + */ + TransactionStructMaker getTransactionStructMaker(); +}