DBZ-7698 Refactor to generic transaction block customization
This commit is contained in:
parent
efdc92a926
commit
261253d9f0
@ -38,7 +38,7 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
|
||||
assertThat(key.importance).isEqualTo(expected.importance());
|
||||
assertThat(key.documentation).isEqualTo(expected.description());
|
||||
assertThat(key.type).isEqualTo(expected.type());
|
||||
if (expected.type() == Type.CLASS && expected.defaultValue() != null) {
|
||||
if (expected.type() == Type.CLASS) {
|
||||
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
|
||||
}
|
||||
else if (expected.type() == ConfigDef.Type.LIST && key.defaultValue != null) {
|
||||
|
@ -33,9 +33,7 @@
|
||||
import io.debezium.connector.postgresql.connection.ReplicationConnection;
|
||||
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder;
|
||||
import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder;
|
||||
import io.debezium.connector.postgresql.pipeline.txmetadata.PostgresTransactionStructMaker;
|
||||
import io.debezium.jdbc.JdbcConfiguration;
|
||||
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
|
||||
import io.debezium.relational.ColumnFilterMode;
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -964,9 +962,6 @@ public static AutoCreateMode parse(String value, String defaultValue) {
|
||||
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
|
||||
.withDefault(PostgresSourceInfoStructMaker.class.getName());
|
||||
|
||||
public static final Field TRANSACTION_STRUCT_MAKER = CommonConnectorConfig.TRANSACTION_STRUCT_MAKER
|
||||
.withDefault(PostgresTransactionStructMaker.class.getName());
|
||||
|
||||
private final LogicalDecodingMessageFilter logicalDecodingMessageFilter;
|
||||
private final HStoreHandlingMode hStoreHandlingMode;
|
||||
private final IntervalHandlingMode intervalHandlingMode;
|
||||
@ -1119,11 +1114,6 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
|
||||
return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransactionStructMaker getTransactionStructMaker() {
|
||||
return getTransactionStructMaker(TRANSACTION_STRUCT_MAKER);
|
||||
}
|
||||
|
||||
private static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
|
||||
.name("Postgres")
|
||||
.excluding(CommonConnectorConfig.SKIPPED_OPERATIONS)
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
import io.debezium.connector.base.ChangeEventQueue;
|
||||
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
|
||||
import io.debezium.connector.postgresql.pipeline.txmetadata.PostgresTransactionMonitor;
|
||||
import io.debezium.heartbeat.Heartbeat;
|
||||
import io.debezium.pipeline.DataChangeEvent;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
@ -42,8 +43,16 @@ public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicNam
|
||||
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<PostgresPartition, T> inconsistentSchemaHandler,
|
||||
EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster,
|
||||
SignalProcessor<PostgresPartition, PostgresOffsetContext> signalProcessor) {
|
||||
super(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler,
|
||||
metadataProvider, heartbeat, schemaNameAdjuster, signalProcessor);
|
||||
super(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, heartbeat, schemaNameAdjuster,
|
||||
new PostgresTransactionMonitor(
|
||||
connectorConfig,
|
||||
metadataProvider,
|
||||
schemaNameAdjuster,
|
||||
(record) -> {
|
||||
queue.enqueue(new DataChangeEvent(record));
|
||||
},
|
||||
topicNamingStrategy.transactionTopic()),
|
||||
signalProcessor);
|
||||
this.queue = queue;
|
||||
this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(connectorConfig, this::enqueueLogicalDecodingMessage);
|
||||
this.messageFilter = connectorConfig.getMessageFilter();
|
||||
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.postgresql.pipeline.txmetadata;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.connector.postgresql.PostgresOffsetContext;
|
||||
import io.debezium.function.BlockingConsumer;
|
||||
import io.debezium.pipeline.source.spi.EventMetadataProvider;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
import io.debezium.schema.SchemaNameAdjuster;
|
||||
|
||||
/**
|
||||
* Postgres specific overrides for {@link io.debezium.pipeline.txmetadata.TransactionMonitor}.
|
||||
* @author vjuranek
|
||||
*/
|
||||
public class PostgresTransactionMonitor extends TransactionMonitor {
|
||||
|
||||
public PostgresTransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider,
|
||||
SchemaNameAdjuster schemaNameAdjuster, BlockingConsumer<SourceRecord> sender,
|
||||
String topicName) {
|
||||
super(connectorConfig, eventMetadataProvider, schemaNameAdjuster, sender, topicName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Struct prepareTxKey(OffsetContext offsetContext) {
|
||||
return adjustTxId(new Struct(transactionKeySchema), offsetContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Struct prepareTxBeginValue(OffsetContext offsetContext, Instant timestamp) {
|
||||
return adjustTxId(super.prepareTxBeginValue(offsetContext, timestamp), offsetContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Struct prepareTxEndValue(OffsetContext offsetContext, Instant timestamp) {
|
||||
return adjustTxId(super.prepareTxEndValue(offsetContext, timestamp), offsetContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) {
|
||||
return adjustTxId(super.prepareTxStruct(offsetContext, dataCollectionEventOrder, value), offsetContext);
|
||||
}
|
||||
|
||||
private Struct adjustTxId(Struct txStruct, OffsetContext offsetContext) {
|
||||
final String lsn = Long.toString(((PostgresOffsetContext) offsetContext).asOffsetState().lastSeenLsn().asLong());
|
||||
final String txId = offsetContext.getTransactionContext().getTransactionId();
|
||||
txStruct.put(DEBEZIUM_TRANSACTION_ID_KEY, String.format("%s:%s", txId, lsn));
|
||||
return txStruct;
|
||||
}
|
||||
}
|
@ -1,45 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.postgresql.pipeline.txmetadata;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.postgresql.PostgresOffsetContext;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.pipeline.txmetadata.BasicTransactionStructMaker;
|
||||
import io.debezium.spi.schema.DataCollectionId;
|
||||
|
||||
public class PostgresTransactionStructMaker extends BasicTransactionStructMaker {
|
||||
|
||||
@Override
|
||||
public Struct prepareTxKey(OffsetContext offsetContext) {
|
||||
return adjustTxId(new Struct(transactionKeySchema), offsetContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct prepareTxBeginValue(OffsetContext offsetContext, Instant timestamp) {
|
||||
return adjustTxId(super.prepareTxBeginValue(offsetContext, timestamp), offsetContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct prepareTxEndValue(OffsetContext offsetContext, Instant timestamp) {
|
||||
return adjustTxId(super.prepareTxEndValue(offsetContext, timestamp), offsetContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct prepareTxStruct(OffsetContext offsetContext, DataCollectionId source) {
|
||||
return adjustTxId(super.prepareTxStruct(offsetContext, source), offsetContext);
|
||||
}
|
||||
|
||||
private Struct adjustTxId(Struct txStruct, OffsetContext offsetContext) {
|
||||
final String lsn = Long.toString(((PostgresOffsetContext) offsetContext).asOffsetState().lastSeenLsn().asLong());
|
||||
final String txId = offsetContext.getTransactionContext().getTransactionId();
|
||||
txStruct.put(DEBEZIUM_TRANSACTION_ID_KEY, String.format("%s:%s", txId, lsn));
|
||||
return txStruct;
|
||||
}
|
||||
}
|
@ -47,7 +47,7 @@
|
||||
import io.debezium.pipeline.ErrorHandler;
|
||||
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
|
||||
import io.debezium.pipeline.txmetadata.BasicTransactionStructMaker;
|
||||
import io.debezium.pipeline.txmetadata.TransactionOrderMetadata;
|
||||
import io.debezium.pipeline.txmetadata.TransactionContext;
|
||||
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
|
||||
import io.debezium.relational.CustomConverterRegistry;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -719,16 +719,6 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
|
||||
.withDescription("Enables transaction metadata extraction together with event counting")
|
||||
.withDefault(Boolean.FALSE);
|
||||
|
||||
public static final Field PROVIDE_ORDERED_TRANSACTION_METADATA = Field.create("provide.ordered.transaction.metadata")
|
||||
.withDisplayName("Provide ordered transaction meatadata")
|
||||
.withType(Type.BOOLEAN)
|
||||
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 19))
|
||||
.withWidth(Width.SHORT)
|
||||
.withDefault(Boolean.FALSE)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription(
|
||||
"Whether to provide order metadata on transactions");
|
||||
|
||||
public static final Field TRANSACTION_STRUCT_MAKER = Field.create("transaction.struct.maker")
|
||||
.withDisplayName("Make transaction struct & schema")
|
||||
.withType(Type.CLASS)
|
||||
@ -738,11 +728,12 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
|
||||
.withDescription(
|
||||
"Class to make transaction struct & schema");
|
||||
|
||||
public static final Field TRANSACTION_ORDER_METADATA_FIELD = Field.create("transaction.ordered.metadata")
|
||||
public static final Field TRANSACTION_CONTEXT = Field.create("transaction.context")
|
||||
.withDisplayName("Class to provide ordered transaction metadata")
|
||||
.withType(Type.CLASS)
|
||||
.withWidth(Width.MEDIUM)
|
||||
.withImportance(ConfigDef.Importance.LOW)
|
||||
.withDefault(TransactionContext.class.getName())
|
||||
.withDescription(
|
||||
"Class to provide order metadata on transactions");
|
||||
|
||||
@ -1092,7 +1083,6 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
|
||||
POLL_INTERVAL_MS,
|
||||
MAX_QUEUE_SIZE_IN_BYTES,
|
||||
PROVIDE_TRANSACTION_METADATA,
|
||||
PROVIDE_ORDERED_TRANSACTION_METADATA,
|
||||
SKIPPED_OPERATIONS,
|
||||
SNAPSHOT_DELAY_MS,
|
||||
SNAPSHOT_MODE_TABLES,
|
||||
@ -1121,7 +1111,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
|
||||
TOPIC_NAMING_STRATEGY,
|
||||
NOTIFICATION_ENABLED_CHANNELS,
|
||||
SinkNotificationChannel.NOTIFICATION_TOPIC,
|
||||
TRANSACTION_ORDER_METADATA_FIELD,
|
||||
TRANSACTION_CONTEXT,
|
||||
TRANSACTION_STRUCT_MAKER,
|
||||
CUSTOM_METRIC_TAGS)
|
||||
.create();
|
||||
@ -1145,10 +1135,9 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
|
||||
private final String snapshotModeCustomName;
|
||||
private final Integer queryFetchSize;
|
||||
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
|
||||
private final TransactionOrderMetadata transactionOrderMetadata;
|
||||
private final TransactionContext transactionContext;
|
||||
private final TransactionStructMaker transactionStructMaker;
|
||||
private final boolean shouldProvideTransactionMetadata;
|
||||
private final boolean shouldProvideOrderedTransactionMetadata;
|
||||
private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode;
|
||||
private final CustomConverterRegistry customConverterRegistry;
|
||||
private final BinaryHandlingMode binaryHandlingMode;
|
||||
@ -1198,10 +1187,9 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
|
||||
this.fieldNameAdjustmentMode = FieldNameAdjustmentMode.parse(config.getString(FIELD_NAME_ADJUSTMENT_MODE));
|
||||
this.eventConvertingFailureHandlingMode = EventConvertingFailureHandlingMode.parse(config.getString(EVENT_CONVERTING_FAILURE_HANDLING_MODE));
|
||||
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.V2);
|
||||
this.transactionOrderMetadata = getTransactionOrderMetadata();
|
||||
this.transactionContext = getTransactionContext();
|
||||
this.transactionStructMaker = getTransactionStructMaker();
|
||||
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
|
||||
this.shouldProvideOrderedTransactionMetadata = config.getBoolean(PROVIDE_ORDERED_TRANSACTION_METADATA);
|
||||
this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE));
|
||||
this.customConverterRegistry = new CustomConverterRegistry(getCustomConverters());
|
||||
this.binaryHandlingMode = BinaryHandlingMode.parse(config.getString(BINARY_HANDLING_MODE));
|
||||
@ -1352,10 +1340,6 @@ public boolean shouldProvideTransactionMetadata() {
|
||||
return shouldProvideTransactionMetadata;
|
||||
}
|
||||
|
||||
public boolean shouldProvideOrderedTransactionMetadata() {
|
||||
return shouldProvideOrderedTransactionMetadata;
|
||||
}
|
||||
|
||||
public boolean skipMessagesWithoutChange() {
|
||||
return skipMessagesWithoutChange;
|
||||
}
|
||||
@ -1420,8 +1404,8 @@ public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStru
|
||||
return (SourceInfoStructMaker<T>) sourceInfoStructMaker;
|
||||
}
|
||||
|
||||
public TransactionOrderMetadata getTransactionOrderMetadata() {
|
||||
return getTransactionOrderMetadata(TRANSACTION_ORDER_METADATA_FIELD);
|
||||
public TransactionContext getTransactionContext() {
|
||||
return getTransactionContext(TRANSACTION_CONTEXT);
|
||||
}
|
||||
|
||||
public TransactionStructMaker getTransactionStructMaker() {
|
||||
@ -1705,13 +1689,7 @@ public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStru
|
||||
|
||||
public TransactionStructMaker getTransactionStructMaker(Field transactionStructMakerField) {
|
||||
final TransactionStructMaker transactionStructMaker;
|
||||
if (!shouldProvideOrderedTransactionMetadata) {
|
||||
// for backward compatibility, return the normal one
|
||||
transactionStructMaker = config.getInstance(TRANSACTION_STRUCT_MAKER, BasicTransactionStructMaker.class);
|
||||
}
|
||||
else {
|
||||
transactionStructMaker = config.getInstance(transactionStructMakerField, TransactionStructMaker.class);
|
||||
}
|
||||
transactionStructMaker = config.getInstance(transactionStructMakerField, TransactionStructMaker.class);
|
||||
transactionStructMaker.setSchemaNameAdjuster(schemaNameAdjuster());
|
||||
if (transactionStructMaker == null) {
|
||||
throw new DebeziumException("Unable to instantiate the transaction struct maker class " + TRANSACTION_STRUCT_MAKER);
|
||||
@ -1719,15 +1697,11 @@ public TransactionStructMaker getTransactionStructMaker(Field transactionStructM
|
||||
return transactionStructMaker;
|
||||
}
|
||||
|
||||
public TransactionOrderMetadata getTransactionOrderMetadata(Field transactionOrderMetadataField) {
|
||||
if (!shouldProvideOrderedTransactionMetadata) {
|
||||
// for backward compatibility, if the setting is disabled we won't use this anyway
|
||||
return null;
|
||||
public TransactionContext getTransactionContext(Field transactionContextField) {
|
||||
final TransactionContext transactionContext = config.getInstance(transactionContextField, TransactionContext.class);
|
||||
if (transactionContext == null) {
|
||||
throw new DebeziumException("Unable to instantiate the transaction ordered metadata class " + transactionContextField);
|
||||
}
|
||||
final TransactionOrderMetadata transactionOrderMetadata = config.getInstance(transactionOrderMetadataField, TransactionOrderMetadata.class);
|
||||
if (transactionOrderMetadata == null) {
|
||||
throw new DebeziumException("Unable to instantiate the transaction ordered metadata class " + TRANSACTION_ORDER_METADATA_FIELD);
|
||||
}
|
||||
return transactionOrderMetadata;
|
||||
return transactionContext;
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,6 @@
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.schema.SchemaFactory;
|
||||
import io.debezium.schema.SchemaNameAdjuster;
|
||||
import io.debezium.spi.schema.DataCollectionId;
|
||||
|
||||
public abstract class AbstractTransactionStructMaker implements TransactionStructMaker {
|
||||
|
||||
@ -32,9 +31,8 @@ public void setSchemaNameAdjuster(SchemaNameAdjuster adjuster) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct prepareTxStruct(OffsetContext offsetContext, DataCollectionId source) {
|
||||
public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) {
|
||||
TransactionContext transactionContext = offsetContext.getTransactionContext();
|
||||
final long dataCollectionEventOrder = transactionContext.event(source);
|
||||
final Struct txStruct = new Struct(getTransactionBlockSchema());
|
||||
txStruct.put(DEBEZIUM_TRANSACTION_ID_KEY, transactionContext.getTransactionId());
|
||||
txStruct.put(DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, transactionContext.getTotalEventCount());
|
||||
|
@ -1,50 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline.txmetadata;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class OrderedTransactionContext extends TransactionContext {
|
||||
|
||||
private TransactionOrderMetadata transactionOrderMetadata;
|
||||
|
||||
public OrderedTransactionContext(TransactionOrderMetadata transactionOrderMetadata) {
|
||||
super();
|
||||
this.transactionOrderMetadata = transactionOrderMetadata;
|
||||
}
|
||||
|
||||
public OrderedTransactionContext(TransactionOrderMetadata transactionOrderMetadata, TransactionContext transactionContext) {
|
||||
super();
|
||||
this.transactionOrderMetadata = transactionOrderMetadata;
|
||||
|
||||
// Copy fields
|
||||
this.transactionId = transactionContext.transactionId;
|
||||
this.totalEventCount = transactionContext.totalEventCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beginTransaction(TransactionInfo transactionInfo) {
|
||||
super.beginTransaction(transactionInfo);
|
||||
transactionOrderMetadata.beginTransaction(transactionInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> store(Map<String, Object> offset) {
|
||||
offset = super.store(offset);
|
||||
return transactionOrderMetadata.store(offset);
|
||||
}
|
||||
|
||||
public static OrderedTransactionContext load(Map<String, ?> offsets, TransactionOrderMetadata transactionOrderMetadata) {
|
||||
TransactionContext transactionContext = TransactionContext.load(offsets);
|
||||
OrderedTransactionContext orderedTransactionContext = new OrderedTransactionContext(transactionOrderMetadata, transactionContext);
|
||||
orderedTransactionContext.transactionOrderMetadata.load(offsets);
|
||||
return orderedTransactionContext;
|
||||
}
|
||||
|
||||
public TransactionOrderMetadata getTransactionOrderMetadata() {
|
||||
return transactionOrderMetadata;
|
||||
}
|
||||
}
|
@ -35,10 +35,10 @@ public class TransactionContext {
|
||||
+ TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY + "_";
|
||||
private static final int OFFSET_TABLE_COUNT_PREFIX_LENGTH = OFFSET_TABLE_COUNT_PREFIX.length();
|
||||
|
||||
protected String transactionId = null;
|
||||
protected final Map<String, Long> perTableEventCount = new HashMap<>();
|
||||
protected final Map<String, Long> viewPerTableEventCount = Collections.unmodifiableMap(perTableEventCount);
|
||||
protected long totalEventCount = 0;
|
||||
public String transactionId = null;
|
||||
public final Map<String, Long> perTableEventCount = new HashMap<>();
|
||||
public final Map<String, Long> viewPerTableEventCount = Collections.unmodifiableMap(perTableEventCount);
|
||||
public long totalEventCount = 0;
|
||||
|
||||
private void reset() {
|
||||
transactionId = null;
|
||||
|
@ -1,35 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline.txmetadata;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
|
||||
public class TransactionContextSupplier {
|
||||
|
||||
private final CommonConnectorConfig connectorConfig;
|
||||
|
||||
public TransactionContextSupplier(CommonConnectorConfig connectorConfig) {
|
||||
this.connectorConfig = connectorConfig;
|
||||
}
|
||||
|
||||
public TransactionContext newTransactionContext() {
|
||||
if (!connectorConfig.shouldProvideOrderedTransactionMetadata()) {
|
||||
return new TransactionContext();
|
||||
}
|
||||
TransactionOrderMetadata transactionOrderMetadata = connectorConfig.getTransactionOrderMetadata();
|
||||
return new OrderedTransactionContext(transactionOrderMetadata);
|
||||
}
|
||||
|
||||
public TransactionContext loadTransactionContext(Map<String, ?> offsets) {
|
||||
if (!connectorConfig.shouldProvideOrderedTransactionMetadata()) {
|
||||
return TransactionContext.load(offsets);
|
||||
}
|
||||
TransactionOrderMetadata transactionOrderMetadata = connectorConfig.getTransactionOrderMetadata();
|
||||
return OrderedTransactionContext.load(offsets, transactionOrderMetadata);
|
||||
}
|
||||
}
|
@ -51,6 +51,10 @@ public class TransactionMonitor {
|
||||
private final BlockingConsumer<SourceRecord> sender;
|
||||
private final CommonConnectorConfig connectorConfig;
|
||||
|
||||
// Following properties are kept for backward compatibility with connectors that override TransactionMonitor
|
||||
protected final Schema transactionKeySchema;
|
||||
protected final String DEBEZIUM_TRANSACTION_ID_KEY = TransactionStructMaker.DEBEZIUM_TRANSACTION_KEY;
|
||||
|
||||
private final TransactionStructMaker transactionStructMaker;
|
||||
|
||||
public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider,
|
||||
@ -59,6 +63,7 @@ public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataPr
|
||||
Objects.requireNonNull(eventMetadataProvider);
|
||||
|
||||
transactionStructMaker = connectorConfig.getTransactionStructMaker();
|
||||
transactionKeySchema = transactionStructMaker.getTransactionKeySchema();
|
||||
|
||||
this.topicName = topicName;
|
||||
this.eventMetadataProvider = eventMetadataProvider;
|
||||
@ -118,25 +123,46 @@ public void transactionStartedEvent(Partition partition, TransactionInfo transac
|
||||
beginTransaction(partition, offset, timestamp);
|
||||
}
|
||||
|
||||
protected Struct prepareTxKey(OffsetContext offsetContext) {
|
||||
final Struct key = transactionStructMaker.prepareTxKey(offsetContext);
|
||||
return key;
|
||||
}
|
||||
|
||||
protected Struct prepareTxBeginValue(OffsetContext offsetContext, Instant timestamp) {
|
||||
final Struct value = transactionStructMaker.prepareTxBeginValue(offsetContext, timestamp);
|
||||
return value;
|
||||
}
|
||||
|
||||
protected Struct prepareTxEndValue(OffsetContext offsetContext, Instant timestamp) {
|
||||
final Struct value = transactionStructMaker.prepareTxEndValue(offsetContext, timestamp);
|
||||
return value;
|
||||
}
|
||||
|
||||
protected Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) {
|
||||
final Struct txStruct = transactionStructMaker.prepareTxStruct(offsetContext, dataCollectionEventOrder, value);
|
||||
return txStruct;
|
||||
}
|
||||
|
||||
private void transactionEvent(OffsetContext offsetContext, DataCollectionId source, Struct value) {
|
||||
final long dataCollectionEventOrder = offsetContext.getTransactionContext().event(source);
|
||||
if (value == null) {
|
||||
LOGGER.debug("Event with key {} without value. Cannot enrich source block.");
|
||||
return;
|
||||
}
|
||||
final Struct txStruct = transactionStructMaker.prepareTxStruct(offsetContext, source);
|
||||
final Struct txStruct = prepareTxStruct(offsetContext, dataCollectionEventOrder, value);
|
||||
value.put(Envelope.FieldName.TRANSACTION, txStruct);
|
||||
}
|
||||
|
||||
private void beginTransaction(Partition partition, OffsetContext offsetContext, Instant timestamp) throws InterruptedException {
|
||||
final Struct key = transactionStructMaker.prepareTxKey(offsetContext);
|
||||
final Struct value = transactionStructMaker.prepareTxBeginValue(offsetContext, timestamp);
|
||||
final Struct key = prepareTxKey(offsetContext);
|
||||
final Struct value = prepareTxBeginValue(offsetContext, timestamp);
|
||||
sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(),
|
||||
topicName, null, key.schema(), key, value.schema(), value));
|
||||
}
|
||||
|
||||
private void endTransaction(Partition partition, OffsetContext offsetContext, Instant timestamp) throws InterruptedException {
|
||||
final Struct key = transactionStructMaker.prepareTxKey(offsetContext);
|
||||
final Struct value = transactionStructMaker.prepareTxEndValue(offsetContext, timestamp);
|
||||
final Struct key = prepareTxKey(offsetContext);
|
||||
final Struct value = prepareTxEndValue(offsetContext, timestamp);
|
||||
sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(),
|
||||
topicName, null, key.schema(), key, value.schema(), value));
|
||||
}
|
||||
|
@ -1,18 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline.txmetadata;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface TransactionOrderMetadata {
|
||||
Map<String, Object> store(Map<String, Object> offset);
|
||||
|
||||
void load(Map<String, ?> offsets);
|
||||
|
||||
void beginTransaction(TransactionInfo transactionInfo);
|
||||
|
||||
void endTransaction();
|
||||
}
|
@ -12,7 +12,6 @@
|
||||
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.schema.SchemaNameAdjuster;
|
||||
import io.debezium.spi.schema.DataCollectionId;
|
||||
|
||||
public interface TransactionStructMaker {
|
||||
String DEBEZIUM_TRANSACTION_KEY = "transaction";
|
||||
@ -27,7 +26,7 @@ public interface TransactionStructMaker {
|
||||
|
||||
void setSchemaNameAdjuster(SchemaNameAdjuster adjuster);
|
||||
|
||||
Struct prepareTxStruct(OffsetContext offsetContext, DataCollectionId source);
|
||||
Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value);
|
||||
|
||||
Struct prepareTxEndValue(OffsetContext offsetContext, Instant timestamp);
|
||||
|
||||
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.pipeline.txmetadata;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class OrderedTransactionContextTest {
|
||||
|
||||
public class TestTransactionOrderMetadata implements TransactionOrderMetadata {
|
||||
@Override
|
||||
public Map<String, Object> store(Map<String, Object> offset) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void load(Map<String, ?> offsets) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beginTransaction(TransactionInfo transactionInfo) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endTransaction() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void beginTransaction() {
|
||||
TransactionOrderMetadata transactionOrderMetadata = new TestTransactionOrderMetadata();
|
||||
TransactionContext context = new OrderedTransactionContext(transactionOrderMetadata);
|
||||
TransactionInfo transactionInfo = new BasicTransactionInfo("foo");
|
||||
context.beginTransaction(transactionInfo);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user