DBZ-7698 Refactor to single factory interface/class/config

This commit is contained in:
twthorn 2024-04-16 12:20:24 -05:00 committed by Jiri Pechanec
parent 0b80389f92
commit 3a9ea7ac4e
4 changed files with 70 additions and 43 deletions

View File

@ -46,9 +46,8 @@
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.pipeline.txmetadata.DefaultTransactionStructMaker;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory;
import io.debezium.pipeline.txmetadata.spi.TransactionMetadataFactory;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaNameAdjuster;
@ -719,23 +718,14 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
.withDescription("Enables transaction metadata extraction together with event counting")
.withDefault(Boolean.FALSE);
public static final Field TRANSACTION_STRUCT_MAKER = Field.create("transaction.struct.maker")
.withDisplayName("Make transaction struct & schema")
public static final Field TRANSACTION_METADATA_FACTORY = Field.create("transaction.metadata.factory")
.withDisplayName("Factory class to create transaction context & transaction struct maker classes")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(DefaultTransactionStructMaker.class.getName())
.withDefault(DefaultTransactionMetadataFactory.class.getName())
.withDescription(
"Class to make transaction struct & schema");
public static final Field TRANSACTION_CONTEXT = Field.create("transaction.context")
.withDisplayName("Provides metadata on transactions")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(TransactionContext.class.getName())
.withDescription(
"Class to provide transaction metadata");
"Class to make transaction context & transaction struct/schemas");
public static final Field EVENT_PROCESSING_FAILURE_HANDLING_MODE = Field.create("event.processing.failure.handling.mode")
.withDisplayName("Event deserialization failure handling")
@ -1111,8 +1101,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
TOPIC_NAMING_STRATEGY,
NOTIFICATION_ENABLED_CHANNELS,
SinkNotificationChannel.NOTIFICATION_TOPIC,
TRANSACTION_CONTEXT,
TRANSACTION_STRUCT_MAKER,
TRANSACTION_METADATA_FACTORY,
CUSTOM_METRIC_TAGS)
.create();
@ -1135,8 +1124,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
private final String snapshotModeCustomName;
private final Integer queryFetchSize;
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
private final TransactionContext transactionContext;
private final TransactionStructMaker transactionStructMaker;
private final TransactionMetadataFactory transactionMetadataFactory;
private final boolean shouldProvideTransactionMetadata;
private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode;
private final CustomConverterRegistry customConverterRegistry;
@ -1187,8 +1175,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.fieldNameAdjustmentMode = FieldNameAdjustmentMode.parse(config.getString(FIELD_NAME_ADJUSTMENT_MODE));
this.eventConvertingFailureHandlingMode = EventConvertingFailureHandlingMode.parse(config.getString(EVENT_CONVERTING_FAILURE_HANDLING_MODE));
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.V2);
this.transactionContext = getTransactionContext();
this.transactionStructMaker = getTransactionStructMaker();
this.transactionMetadataFactory = getTransactionMetadataFactory();
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE));
this.customConverterRegistry = new CustomConverterRegistry(getCustomConverters());
@ -1404,12 +1391,8 @@ public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStru
return (SourceInfoStructMaker<T>) sourceInfoStructMaker;
}
public TransactionContext getTransactionContext() {
return getTransactionContext(TRANSACTION_CONTEXT);
}
public TransactionStructMaker getTransactionStructMaker() {
return getTransactionStructMaker(TRANSACTION_STRUCT_MAKER);
public TransactionMetadataFactory getTransactionMetadataFactory() {
return getTransactionMetadataFactory(TRANSACTION_METADATA_FACTORY);
}
public EnumSet<Envelope.Operation> getSkippedOperations() {
@ -1687,20 +1670,11 @@ public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStru
return sourceInfoStructMaker;
}
public TransactionStructMaker getTransactionStructMaker(Field transactionStructMakerField) {
final TransactionStructMaker transactionStructMaker;
transactionStructMaker = config.getInstance(transactionStructMakerField, TransactionStructMaker.class, config);
if (transactionStructMaker == null) {
throw new DebeziumException("Unable to instantiate the transaction struct maker class " + TRANSACTION_STRUCT_MAKER);
public TransactionMetadataFactory getTransactionMetadataFactory(Field transactionMetadataFactoryField) {
final TransactionMetadataFactory factory = config.getInstance(transactionMetadataFactoryField, TransactionMetadataFactory.class, config);
if (factory == null) {
throw new DebeziumException("Unable to instantiate the transaction struct maker class " + TRANSACTION_METADATA_FACTORY);
}
return transactionStructMaker;
}
public TransactionContext getTransactionContext(Field transactionContextField) {
final TransactionContext transactionContext = config.getInstance(transactionContextField, TransactionContext.class);
if (transactionContext == null) {
throw new DebeziumException("Unable to instantiate the transaction context class " + transactionContextField);
}
return transactionContext;
return factory;
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.txmetadata;
import io.debezium.config.Configuration;
import io.debezium.pipeline.txmetadata.spi.TransactionMetadataFactory;
public class DefaultTransactionMetadataFactory implements TransactionMetadataFactory {
private final Configuration configuration;
public DefaultTransactionMetadataFactory(Configuration configuration) {
this.configuration = configuration;
}
@Override
public TransactionContext getTransactionContext() {
return new TransactionContext();
}
@Override
public TransactionStructMaker getTransactionStructMaker() {
return new DefaultTransactionStructMaker(configuration);
}
}

View File

@ -61,7 +61,7 @@ public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataPr
String topicName) {
Objects.requireNonNull(eventMetadataProvider);
transactionStructMaker = connectorConfig.getTransactionStructMaker();
transactionStructMaker = connectorConfig.getTransactionMetadataFactory().getTransactionStructMaker();
transactionKeySchema = transactionStructMaker.getTransactionKeySchema();
this.topicName = topicName;

View File

@ -0,0 +1,25 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.txmetadata.spi;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
public interface TransactionMetadataFactory {
/**
* Provide a {@link TransactionContext} that is used for tracking/processing transaction metadata
*
* @return
*/
TransactionContext getTransactionContext();
/**
* Provide a {@link TransactionStructMaker} that is used to build the structs that stores transaction metadata
*
* @return
*/
TransactionStructMaker getTransactionStructMaker();
}