From b672e24d0e4487f00b2efe3fad96e16efd4ea42c Mon Sep 17 00:00:00 2001 From: Roman Kudryashov Date: Thu, 29 Aug 2024 13:43:12 +0300 Subject: [PATCH] DBZ-8185 Add `transaction` field to change event of a logical decoding message --- .../LogicalDecodingMessageMonitor.java | 33 +++++++++++- .../postgresql/PostgresEventDispatcher.java | 2 +- .../postgresql/PostgresSchemaFactory.java | 1 + .../postgresql/LogicalDecodingMessageIT.java | 52 ++++++++++++++++++- .../io/debezium/pipeline/EventDispatcher.java | 2 +- 5 files changed, 86 insertions(+), 4 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java index c201ffb72..d602f9ae3 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java @@ -10,6 +10,7 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Base64.Encoder; +import java.util.List; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -21,7 +22,10 @@ import io.debezium.function.BlockingConsumer; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; +import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.schema.SchemaNameAdjuster; +import io.debezium.spi.schema.DataCollectionId; +import io.debezium.util.Collect; import io.debezium.util.HexConverter; /** @@ -84,7 +88,7 @@ public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, Bl } public void logicalDecodingMessageEvent(Partition partition, OffsetContext offsetContext, Long timestamp, - LogicalDecodingMessage message) + LogicalDecodingMessage message, TransactionMonitor transactionMonitor) throws InterruptedException { final Struct logicalMsgStruct = new Struct(blockSchema); logicalMsgStruct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, message.getPrefix()); @@ -99,6 +103,8 @@ public void logicalDecodingMessageEvent(Partition partition, OffsetContext offse value.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, logicalMsgStruct); value.put(Envelope.FieldName.SOURCE, offsetContext.getSourceInfo()); + transactionMonitor.dataEvent(partition, new LogicalDecodingMessageId(), offsetContext, key, value); + sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName, keySchema, key, value.schema(), value)); @@ -121,4 +127,29 @@ private Object convertContent(byte[] content) { return ByteBuffer.wrap(content); } } + + public class LogicalDecodingMessageId implements DataCollectionId { + + private final static String LOGICAL_DECODING_MESSAGE_ID = "LOGICAL_DECODING_MESSAGE"; + + @Override + public String identifier() { + return LOGICAL_DECODING_MESSAGE_ID; + } + + @Override + public List parts() { + return Collect.arrayListOf(LOGICAL_DECODING_MESSAGE_ID); + } + + @Override + public List databaseParts() { + return null; + } + + @Override + public List schemaParts() { + return null; + } + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresEventDispatcher.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresEventDispatcher.java index d8bbf5eac..3bca80b51 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresEventDispatcher.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresEventDispatcher.java @@ -62,7 +62,7 @@ public void dispatchLogicalDecodingMessage(Partition partition, OffsetContext of LogicalDecodingMessage message) throws InterruptedException { if (messageFilter.isIncluded(message.getPrefix())) { - logicalDecodingMessageMonitor.logicalDecodingMessageEvent(partition, offset, decodeTimestamp, message); + logicalDecodingMessageMonitor.logicalDecodingMessageEvent(partition, offset, decodeTimestamp, message, transactionMonitor); } else { LOGGER.trace("Filtered data change event for logical decoding message with prefix{}", message.getPrefix()); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java index 701891f93..233760827 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java @@ -64,6 +64,7 @@ public Schema logicalDecodingMessageMonitorValueSchema(SchemaNameAdjuster adjust .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA) .field(Envelope.FieldName.SOURCE, config.getSourceInfoStructMaker().schema()) + .field(Envelope.FieldName.TRANSACTION, config.getTransactionMetadataFactory().getTransactionStructMaker().getTransactionBlockSchema()) .field(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, logicalDecodingMessageMonitorBlockSchema(adjuster, binaryHandlingMode)) .build(); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/LogicalDecodingMessageIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/LogicalDecodingMessageIT.java index e719503f4..2a403040e 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/LogicalDecodingMessageIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/LogicalDecodingMessageIT.java @@ -114,6 +114,7 @@ public void shouldConsumeNonTransactionalLogicalDecodingMessages() throws Except Struct value = (Struct) record.value(); String op = value.getString(Envelope.FieldName.OPERATION); Struct source = value.getStruct(Envelope.FieldName.SOURCE); + Struct transaction = value.getStruct(Envelope.FieldName.TRANSACTION); Struct message = value.getStruct(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY); assertNull(source.getInt64(SourceInfo.TXID_KEY)); @@ -122,6 +123,8 @@ public void shouldConsumeNonTransactionalLogicalDecodingMessages() throws Except assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY)); assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY)); + assertNull(transaction); + assertEquals(Envelope.Operation.MESSAGE.code(), op); assertEquals("foo", message.getString(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY)); @@ -144,7 +147,7 @@ public void shouldConsumeTransactionalLogicalDecodingMessages() throws Exception // emit transactional logical decoding message with text TestHelper.execute("SELECT pg_logical_emit_message(true, 'txn_foo', 'txn_bar');"); - // emit transactional logical decoding message with binary + // emit non transactional logical decoding message with binary TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', E'txn_bar'::bytea);"); SourceRecords txnRecords = consumeRecordsByTopic(1); @@ -155,6 +158,7 @@ public void shouldConsumeTransactionalLogicalDecodingMessages() throws Exception Struct value = (Struct) record.value(); String op = value.getString(Envelope.FieldName.OPERATION); Struct source = value.getStruct(Envelope.FieldName.SOURCE); + Struct transaction = value.getStruct(Envelope.FieldName.TRANSACTION); Struct message = value.getStruct(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY); assertNotNull(source.getInt64(SourceInfo.TXID_KEY)); @@ -163,6 +167,52 @@ public void shouldConsumeTransactionalLogicalDecodingMessages() throws Exception assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY)); assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY)); + assertNull(transaction); + + assertEquals(Envelope.Operation.MESSAGE.code(), op); + assertEquals("txn_foo", + message.getString(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY)); + assertArrayEquals("txn_bar".getBytes(), + message.getBytes(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY)); + }); + } + + @Test + @FixFor("DBZ-8185") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") + @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") + public void shouldConsumeTransactionalLogicalDecodingMessagesThatContainTransactionData() throws Exception { + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true); + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForSnapshotToBeCompleted(); + + // emit transactional logical decoding message with text + TestHelper.execute("SELECT pg_logical_emit_message(true, 'txn_foo', 'txn_bar');"); + + SourceRecords txnRecords = consumeRecordsByTopic(2); + List txnRecordsForTopic = txnRecords.recordsForTopic(topicName("message")); + assertThat(txnRecordsForTopic).hasSize(1); + + txnRecordsForTopic.forEach(record -> { + Struct value = (Struct) record.value(); + String op = value.getString(Envelope.FieldName.OPERATION); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + Struct transaction = value.getStruct(Envelope.FieldName.TRANSACTION); + Struct message = value.getStruct(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY); + + assertNotNull(source.getInt64(SourceInfo.TXID_KEY)); + assertNotNull(source.getInt64(SourceInfo.TIMESTAMP_KEY)); + assertNotNull(source.getInt64(SourceInfo.LSN_KEY)); + assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY)); + assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY)); + + assertNotNull(transaction); + assertThat(transaction.getString("id")).isNotBlank(); + assertEquals(1, transaction.getInt64("total_order").longValue()); + assertEquals(1, transaction.getInt64("data_collection_order").longValue()); + assertEquals(Envelope.Operation.MESSAGE.code(), op); assertEquals("txn_foo", message.getString(LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY)); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java index 324b638c9..30f31e312 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/EventDispatcher.java @@ -71,6 +71,7 @@ public class EventDispatcher

im private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class); + protected final TransactionMonitor transactionMonitor; private final TopicNamingStrategy topicNamingStrategy; private final DatabaseSchema schema; private final HistorizedDatabaseSchema historizedSchema; @@ -81,7 +82,6 @@ public class EventDispatcher

im private DataChangeEventListener

eventListener = DataChangeEventListener.NO_OP(); private final boolean emitTombstonesOnDelete; private final InconsistentSchemaHandler inconsistentSchemaHandler; - private final TransactionMonitor transactionMonitor; private final CommonConnectorConfig connectorConfig; private final EnumSet skippedOperations; private final boolean neverSkip;