From 851b2c747937e576f6251593903aa36c51d52084 Mon Sep 17 00:00:00 2001 From: Roman Kudryashov Date: Wed, 31 Jul 2024 20:17:22 +0300 Subject: [PATCH] DBZ-8103 Add tests --- .../postgresql/CloudEventsConverterIT.java | 101 +++++++++ .../connector/postgresql/TestHelper.java | 6 +- ...codeLogicalDecodingMessageContentTest.java | 193 ++++++++++++++++++ 3 files changed, 297 insertions(+), 3 deletions(-) create mode 100644 debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java index d7084f272..da9264df0 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CloudEventsConverterIT.java @@ -6,14 +6,33 @@ package io.debezium.connector.postgresql; +import static io.debezium.junit.EqualityCheck.LESS_THAN; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.LinkedHashMap; +import java.util.Map; import java.util.UUID; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.HeaderFrom; import org.junit.Before; +import org.junit.Test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; +import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot; +import io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent; import io.debezium.converters.AbstractCloudEventsConverterTest; +import io.debezium.converters.CloudEventsConverterTest; +import io.debezium.doc.FixFor; import io.debezium.jdbc.JdbcConnection; +import io.debezium.junit.SkipWhenDatabaseVersion; +import io.debezium.transforms.outbox.EventRouter; /** * Integration test for {@link io.debezium.converters.CloudEventsConverter} with {@link PostgresConnector} @@ -89,6 +108,10 @@ protected String topicNameOutbox() { return TestHelper.topicName("outboxsmtit.outbox"); } + protected String topicNameMessage() { + return TestHelper.topicName("message"); + } + @Override protected void createTable() throws Exception { TestHelper.execute(SETUP_SCHEMA); @@ -138,8 +161,86 @@ else if (payloadJson.isEmpty()) { return insert.toString(); } + protected String createStatementToCallInsertToWalFunction(String eventId, + String eventType, + String aggregateType, + String aggregateId, + String payloadJson) { + StringBuilder statement = new StringBuilder(); + statement.append("SELECT pg_logical_emit_message(true, 'foo', '"); + + ObjectMapper mapper = new ObjectMapper(); + ObjectNode rootNode = mapper.createObjectNode(); + rootNode.put("id", eventId); + rootNode.put("type", eventType); + rootNode.put("aggregateType", aggregateType); + rootNode.put("aggregateId", aggregateId); + rootNode.put("payload", payloadJson); + + statement.append(rootNode).append("');"); + + return statement.toString(); + } + @Override protected void waitForStreamingStarted() throws InterruptedException { waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); } + + @Test + @FixFor("DBZ-8103") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") + @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") + public void shouldConvertToCloudEventsInJsonWithDataAsJsonAndAllMetadataInHeadersAfterOutboxEventRouterAppliedToLogicalDecodingMessage() throws Exception { + HeaderFrom headerFrom = new HeaderFrom.Value<>(); + Map headerFromConfig = new LinkedHashMap<>(); + headerFromConfig.put("fields", "source,op"); + headerFromConfig.put("headers", "source,op"); + headerFromConfig.put("operation", "copy"); + headerFromConfig.put("header.converter.schemas.enable", "true"); + headerFrom.configure(headerFromConfig); + + DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); + Map smtConfig = new LinkedHashMap<>(); + decodeLogicalDecodingMessageContent.configure(smtConfig); + + EventRouter outboxEventRouter = new EventRouter<>(); + Map outboxEventRouterConfig = new LinkedHashMap<>(); + outboxEventRouterConfig.put("table.expand.json.payload", "true"); + outboxEventRouterConfig.put("table.field.event.key", "aggregateId"); + // this adds `type` header with value from the DB column. `id` header is added by Outbox Event Router by default + outboxEventRouterConfig.put("table.fields.additional.placement", "type:header"); + outboxEventRouterConfig.put("route.by.field", "aggregateType"); + outboxEventRouter.configure(outboxEventRouterConfig); + + // emit non transactional logical decoding message + TestHelper.execute(createStatementToCallInsertToWalFunction("59a42efd-b015-44a9-9dde-cb36d9002425", + "UserCreated", + "User", + "10711fa5", + "{" + + "\"someField1\": \"some value 1\"," + + "\"someField2\": 7005" + + "}")); + + SourceRecords streamingRecords = consumeRecordsByTopic(1); + assertThat(streamingRecords.allRecordsInOrder()).hasSize(1); + + SourceRecord record = streamingRecords.recordsForTopic(topicNameMessage()).get(0); + SourceRecord recordWithMetadataHeaders = headerFrom.apply(record); + SourceRecord recordWithDecodedLogicalDecodingMessageContent = decodeLogicalDecodingMessageContent.apply(recordWithMetadataHeaders); + SourceRecord routedEvent = outboxEventRouter.apply(recordWithDecodedLogicalDecodingMessageContent); + + assertThat(routedEvent).isNotNull(); + assertThat(routedEvent.topic()).isEqualTo("outbox.event.User"); + assertThat(routedEvent.keySchema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA); + assertThat(routedEvent.key()).isEqualTo("10711fa5"); + assertThat(routedEvent.value()).isInstanceOf(Struct.class); + + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders(routedEvent, getConnectorName(), getServerName()); + + headerFrom.close(); + decodeLogicalDecodingMessageContent.close(); + outboxEventRouter.close(); + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index 79a669431..546b18702 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -323,7 +323,7 @@ protected static void executeDDL(String ddlFile) throws Exception { } } - protected static String topicName(String suffix) { + public static String topicName(String suffix) { return TestHelper.TEST_SERVER + "." + suffix; } @@ -365,7 +365,7 @@ protected static SlotState getDefaultReplicationSlot() { } } - protected static void dropDefaultReplicationSlot() { + public static void dropDefaultReplicationSlot() { try { execute("SELECT pg_drop_replication_slot('" + ReplicationConnection.Builder.DEFAULT_SLOT_NAME + "')"); } @@ -376,7 +376,7 @@ protected static void dropDefaultReplicationSlot() { } } - protected static void dropPublication() { + public static void dropPublication() { dropPublication(ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java new file mode 100644 index 000000000..33a641524 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java @@ -0,0 +1,193 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.transforms; + +import static io.debezium.connector.postgresql.TestHelper.topicName; +import static io.debezium.junit.EqualityCheck.LESS_THAN; +import static junit.framework.TestCase.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnector; +import io.debezium.connector.postgresql.SourceInfo; +import io.debezium.connector.postgresql.TestHelper; +import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule; +import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot; +import io.debezium.data.Envelope; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.junit.SkipWhenDatabaseVersion; + +/** + * Tests for {@link io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent} SMT. + * + * @author Roman Kudryashov + */ +public class DecodeLogicalDecodingMessageContentTest extends AbstractConnectorTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContentTest.class); + + @Rule + public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule(); + + @BeforeClass + public static void beforeClass() throws SQLException { + TestHelper.dropAllSchemas(); + } + + @Before + public void before() { + initializeConnectorTestFramework(); + } + + @After + public void after() { + stopConnector(); + TestHelper.dropDefaultReplicationSlot(); + TestHelper.dropPublication(); + } + + @Test + @FixFor("DBZ-8103") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") + @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") + public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Exception { + DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); + Map smtConfig = new LinkedHashMap<>(); + decodeLogicalDecodingMessageContent.configure(smtConfig); + + Configuration.Builder configBuilder = TestHelper.defaultConfig(); + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); + + // emit non transactional logical decoding message + TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '');"); + + SourceRecords records = consumeRecordsByTopic(1); + List recordsForTopic = records.recordsForTopic(topicName("message")); + assertThat(recordsForTopic).hasSize(1); + + Exception exception = assertThrows(DataException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0))); + + assertThat(exception.getMessage()).isEqualTo("Conversion of logical decoding message content failed"); + + decodeLogicalDecodingMessageContent.close(); + stopConnector(); + } + + @Test + @FixFor("DBZ-8103") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") + @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") + public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmptyContent() throws Exception { + DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); + Map smtConfig = new LinkedHashMap<>(); + decodeLogicalDecodingMessageContent.configure(smtConfig); + + Configuration.Builder configBuilder = TestHelper.defaultConfig(); + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); + + // emit non transactional logical decoding message + TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '{}');"); + + SourceRecords records = consumeRecordsByTopic(1); + List recordsForTopic = records.recordsForTopic(topicName("message")); + assertThat(recordsForTopic).hasSize(1); + + SourceRecord transformedRecord = decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0)); + assertThat(transformedRecord).isNotNull(); + + Struct value = (Struct) transformedRecord.value(); + String op = value.getString(Envelope.FieldName.OPERATION); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + + assertNull(source.getInt64(SourceInfo.TXID_KEY)); + assertNotNull(source.getInt64(SourceInfo.TIMESTAMP_KEY)); + assertNotNull(source.getInt64(SourceInfo.LSN_KEY)); + assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY)); + assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY)); + + assertEquals(Envelope.Operation.CREATE.code(), op); + assertEquals(0, after.schema().fields().size()); + List recordValueSchemaFields = value.schema().fields(); + assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message"))); + + decodeLogicalDecodingMessageContent.close(); + stopConnector(); + } + + @Test + @FixFor("DBZ-8103") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput") + @SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14") + public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithContent() throws Exception { + DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>(); + Map smtConfig = new LinkedHashMap<>(); + decodeLogicalDecodingMessageContent.configure(smtConfig); + + Configuration.Builder configBuilder = TestHelper.defaultConfig(); + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); + + // emit transactional logical decoding message + TestHelper.execute("SELECT pg_logical_emit_message(true, 'foo', '{\"bar\": \"baz\", \"qux\": 9703}');"); + + SourceRecords records = consumeRecordsByTopic(1); + List recordsForTopic = records.recordsForTopic(topicName("message")); + assertThat(recordsForTopic).hasSize(1); + + SourceRecord transformedRecord = decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0)); + assertThat(transformedRecord).isNotNull(); + + Struct value = (Struct) transformedRecord.value(); + String op = value.getString(Envelope.FieldName.OPERATION); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + + assertNotNull(source.getInt64(SourceInfo.TXID_KEY)); + assertNotNull(source.getInt64(SourceInfo.TIMESTAMP_KEY)); + assertNotNull(source.getInt64(SourceInfo.LSN_KEY)); + assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY)); + assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY)); + + assertEquals(Envelope.Operation.CREATE.code(), op); + assertEquals(2, after.schema().fields().size()); + assertEquals("baz", after.get("bar")); + assertEquals(9703, after.get("qux")); + List recordValueSchemaFields = value.schema().fields(); + assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message"))); + + decodeLogicalDecodingMessageContent.close(); + stopConnector(); + } +}