DBZ-8185 Add transaction field to change event of a logical decoding message

This commit is contained in:
Roman Kudryashov 2024-08-29 13:43:12 +03:00 committed by Jiri Pechanec
parent 4b5cf5e56c
commit b672e24d0e
5 changed files with 86 additions and 4 deletions

View File

@ -10,6 +10,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -21,7 +22,10 @@
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
import io.debezium.util.HexConverter;
/**
@ -84,7 +88,7 @@ public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, Bl
}
public void logicalDecodingMessageEvent(Partition partition, OffsetContext offsetContext, Long timestamp,
LogicalDecodingMessage message)
LogicalDecodingMessage message, TransactionMonitor transactionMonitor)
throws InterruptedException {
final Struct logicalMsgStruct = new Struct(blockSchema);
logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, message.getPrefix());
@ -99,6 +103,8 @@ public void logicalDecodingMessageEvent(Partition partition, OffsetContext offse
value.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, logicalMsgStruct);
value.put(Envelope.FieldName.SOURCE, offsetContext.getSourceInfo());
transactionMonitor.dataEvent(partition, new LogicalDecodingMessageId(), offsetContext, key, value);
sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName,
keySchema, key, value.schema(), value));
@ -121,4 +127,29 @@ private Object convertContent(byte[] content) {
return ByteBuffer.wrap(content);
}
}
public class LogicalDecodingMessageId implements DataCollectionId {
private final static String LOGICAL_DECODING_MESSAGE_ID = "LOGICAL_DECODING_MESSAGE";
@Override
public String identifier() {
return LOGICAL_DECODING_MESSAGE_ID;
}
@Override
public List<String> parts() {
return Collect.arrayListOf(LOGICAL_DECODING_MESSAGE_ID);
}
@Override
public List<String> databaseParts() {
return null;
}
@Override
public List<String> schemaParts() {
return null;
}
}
}

View File

@ -62,7 +62,7 @@ public void dispatchLogicalDecodingMessage(Partition partition, OffsetContext of
LogicalDecodingMessage message)
throws InterruptedException {
if (messageFilter.isIncluded(message.getPrefix())) {
logicalDecodingMessageMonitor.logicalDecodingMessageEvent(partition, offset, decodeTimestamp, message);
logicalDecodingMessageMonitor.logicalDecodingMessageEvent(partition, offset, decodeTimestamp, message, transactionMonitor);
}
else {
LOGGER.trace("Filtered data change event for logical decoding message with prefix{}", message.getPrefix());

View File

@ -64,6 +64,7 @@ public Schema logicalDecodingMessageMonitorValueSchema(SchemaNameAdjuster adjust
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA)
.field(Envelope.FieldName.SOURCE, config.getSourceInfoStructMaker().schema())
.field(Envelope.FieldName.TRANSACTION, config.getTransactionMetadataFactory().getTransactionStructMaker().getTransactionBlockSchema())
.field(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, logicalDecodingMessageMonitorBlockSchema(adjuster, binaryHandlingMode))
.build();
}

View File

@ -114,6 +114,7 @@ public void shouldConsumeNonTransactionalLogicalDecodingMessages() throws Except
Struct value = (Struct) record.value();
String op = value.getString(Envelope.FieldName.OPERATION);
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct transaction = value.getStruct(Envelope.FieldName.TRANSACTION);
Struct message = value.getStruct(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY);
assertNull(source.getInt64(SourceInfo.TXID_KEY));
@ -122,6 +123,8 @@ public void shouldConsumeNonTransactionalLogicalDecodingMessages() throws Except
assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY));
assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY));
assertNull(transaction);
assertEquals(Envelope.Operation.MESSAGE.code(), op);
assertEquals("foo",
message.getString(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY));
@ -144,7 +147,7 @@ public void shouldConsumeTransactionalLogicalDecodingMessages() throws Exception
// emit transactional logical decoding message with text
TestHelper.execute("SELECT pg_logical_emit_message(true, 'txn_foo', 'txn_bar');");
// emit transactional logical decoding message with binary
// emit non transactional logical decoding message with binary
TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', E'txn_bar'::bytea);");
SourceRecords txnRecords = consumeRecordsByTopic(1);
@ -155,6 +158,7 @@ public void shouldConsumeTransactionalLogicalDecodingMessages() throws Exception
Struct value = (Struct) record.value();
String op = value.getString(Envelope.FieldName.OPERATION);
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct transaction = value.getStruct(Envelope.FieldName.TRANSACTION);
Struct message = value.getStruct(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY);
assertNotNull(source.getInt64(SourceInfo.TXID_KEY));
@ -163,6 +167,52 @@ public void shouldConsumeTransactionalLogicalDecodingMessages() throws Exception
assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY));
assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY));
assertNull(transaction);
assertEquals(Envelope.Operation.MESSAGE.code(), op);
assertEquals("txn_foo",
message.getString(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY));
assertArrayEquals("txn_bar".getBytes(),
message.getBytes(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY));
});
}
@Test
@FixFor("DBZ-8185")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldConsumeTransactionalLogicalDecodingMessagesThatContainTransactionData() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForSnapshotToBeCompleted();
// emit transactional logical decoding message with text
TestHelper.execute("SELECT pg_logical_emit_message(true, 'txn_foo', 'txn_bar');");
SourceRecords txnRecords = consumeRecordsByTopic(2);
List<SourceRecord> txnRecordsForTopic = txnRecords.recordsForTopic(topicName("message"));
assertThat(txnRecordsForTopic).hasSize(1);
txnRecordsForTopic.forEach(record -> {
Struct value = (Struct) record.value();
String op = value.getString(Envelope.FieldName.OPERATION);
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct transaction = value.getStruct(Envelope.FieldName.TRANSACTION);
Struct message = value.getStruct(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY);
assertNotNull(source.getInt64(SourceInfo.TXID_KEY));
assertNotNull(source.getInt64(SourceInfo.TIMESTAMP_KEY));
assertNotNull(source.getInt64(SourceInfo.LSN_KEY));
assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY));
assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY));
assertNotNull(transaction);
assertThat(transaction.getString("id")).isNotBlank();
assertEquals(1, transaction.getInt64("total_order").longValue());
assertEquals(1, transaction.getInt64("data_collection_order").longValue());
assertEquals(Envelope.Operation.MESSAGE.code(), op);
assertEquals("txn_foo",
message.getString(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY));

View File

@ -71,6 +71,7 @@ public class EventDispatcher<P extends Partition, T extends DataCollectionId> im
private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
protected final TransactionMonitor transactionMonitor;
private final TopicNamingStrategy<T> topicNamingStrategy;
private final DatabaseSchema<T> schema;
private final HistorizedDatabaseSchema<T> historizedSchema;
@ -81,7 +82,6 @@ public class EventDispatcher<P extends Partition, T extends DataCollectionId> im
private DataChangeEventListener<P> eventListener = DataChangeEventListener.NO_OP();
private final boolean emitTombstonesOnDelete;
private final InconsistentSchemaHandler<P, T> inconsistentSchemaHandler;
private final TransactionMonitor transactionMonitor;
private final CommonConnectorConfig connectorConfig;
private final EnumSet<Operation> skippedOperations;
private final boolean neverSkip;