DBZ-8103 Add tests

This commit is contained in:
Roman Kudryashov 2024-07-31 20:17:22 +03:00 committed by Jiri Pechanec
parent 8c98c979c7
commit 851b2c7479
3 changed files with 297 additions and 3 deletions

View File

@ -6,14 +6,33 @@
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.HeaderFrom;
import org.junit.Before; import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent;
import io.debezium.converters.AbstractCloudEventsConverterTest; import io.debezium.converters.AbstractCloudEventsConverterTest;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.transforms.outbox.EventRouter;
/** /**
* Integration test for {@link io.debezium.converters.CloudEventsConverter} with {@link PostgresConnector} * Integration test for {@link io.debezium.converters.CloudEventsConverter} with {@link PostgresConnector}
@ -89,6 +108,10 @@ protected String topicNameOutbox() {
return TestHelper.topicName("outboxsmtit.outbox"); return TestHelper.topicName("outboxsmtit.outbox");
} }
protected String topicNameMessage() {
return TestHelper.topicName("message");
}
@Override @Override
protected void createTable() throws Exception { protected void createTable() throws Exception {
TestHelper.execute(SETUP_SCHEMA); TestHelper.execute(SETUP_SCHEMA);
@ -138,8 +161,86 @@ else if (payloadJson.isEmpty()) {
return insert.toString(); return insert.toString();
} }
protected String createStatementToCallInsertToWalFunction(String eventId,
String eventType,
String aggregateType,
String aggregateId,
String payloadJson) {
StringBuilder statement = new StringBuilder();
statement.append("SELECT pg_logical_emit_message(true, 'foo', '");
ObjectMapper mapper = new ObjectMapper();
ObjectNode rootNode = mapper.createObjectNode();
rootNode.put("id", eventId);
rootNode.put("type", eventType);
rootNode.put("aggregateType", aggregateType);
rootNode.put("aggregateId", aggregateId);
rootNode.put("payload", payloadJson);
statement.append(rootNode).append("');");
return statement.toString();
}
@Override @Override
protected void waitForStreamingStarted() throws InterruptedException { protected void waitForStreamingStarted() throws InterruptedException {
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER); waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
} }
@Test
@FixFor("DBZ-8103")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldConvertToCloudEventsInJsonWithDataAsJsonAndAllMetadataInHeadersAfterOutboxEventRouterAppliedToLogicalDecodingMessage() throws Exception {
HeaderFrom<SourceRecord> headerFrom = new HeaderFrom.Value<>();
Map<String, String> headerFromConfig = new LinkedHashMap<>();
headerFromConfig.put("fields", "source,op");
headerFromConfig.put("headers", "source,op");
headerFromConfig.put("operation", "copy");
headerFromConfig.put("header.converter.schemas.enable", "true");
headerFrom.configure(headerFromConfig);
DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
Map<String, String> smtConfig = new LinkedHashMap<>();
decodeLogicalDecodingMessageContent.configure(smtConfig);
EventRouter<SourceRecord> outboxEventRouter = new EventRouter<>();
Map<String, String> outboxEventRouterConfig = new LinkedHashMap<>();
outboxEventRouterConfig.put("table.expand.json.payload", "true");
outboxEventRouterConfig.put("table.field.event.key", "aggregateId");
// this adds `type` header with value from the DB column. `id` header is added by Outbox Event Router by default
outboxEventRouterConfig.put("table.fields.additional.placement", "type:header");
outboxEventRouterConfig.put("route.by.field", "aggregateType");
outboxEventRouter.configure(outboxEventRouterConfig);
// emit non transactional logical decoding message
TestHelper.execute(createStatementToCallInsertToWalFunction("59a42efd-b015-44a9-9dde-cb36d9002425",
"UserCreated",
"User",
"10711fa5",
"{" +
"\"someField1\": \"some value 1\"," +
"\"someField2\": 7005" +
"}"));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicNameMessage()).get(0);
SourceRecord recordWithMetadataHeaders = headerFrom.apply(record);
SourceRecord recordWithDecodedLogicalDecodingMessageContent = decodeLogicalDecodingMessageContent.apply(recordWithMetadataHeaders);
SourceRecord routedEvent = outboxEventRouter.apply(recordWithDecodedLogicalDecodingMessageContent);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
assertThat(routedEvent.keySchema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("10711fa5");
assertThat(routedEvent.value()).isInstanceOf(Struct.class);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders(routedEvent, getConnectorName(), getServerName());
headerFrom.close();
decodeLogicalDecodingMessageContent.close();
outboxEventRouter.close();
}
} }

View File

@ -323,7 +323,7 @@ protected static void executeDDL(String ddlFile) throws Exception {
} }
} }
protected static String topicName(String suffix) { public static String topicName(String suffix) {
return TestHelper.TEST_SERVER + "." + suffix; return TestHelper.TEST_SERVER + "." + suffix;
} }
@ -365,7 +365,7 @@ protected static SlotState getDefaultReplicationSlot() {
} }
} }
protected static void dropDefaultReplicationSlot() { public static void dropDefaultReplicationSlot() {
try { try {
execute("SELECT pg_drop_replication_slot('" + ReplicationConnection.Builder.DEFAULT_SLOT_NAME + "')"); execute("SELECT pg_drop_replication_slot('" + ReplicationConnection.Builder.DEFAULT_SLOT_NAME + "')");
} }
@ -376,7 +376,7 @@ protected static void dropDefaultReplicationSlot() {
} }
} }
protected static void dropPublication() { public static void dropPublication() {
dropPublication(ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME); dropPublication(ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME);
} }

View File

@ -0,0 +1,193 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.transforms;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static junit.framework.TestCase.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.TestHelper;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.SkipWhenDatabaseVersion;
/**
* Tests for {@link io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent} SMT.
*
* @author Roman Kudryashov
*/
public class DecodeLogicalDecodingMessageContentTest extends AbstractConnectorTest {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContentTest.class);
@Rule
public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();
@BeforeClass
public static void beforeClass() throws SQLException {
TestHelper.dropAllSchemas();
}
@Before
public void before() {
initializeConnectorTestFramework();
}
@After
public void after() {
stopConnector();
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
}
@Test
@FixFor("DBZ-8103")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Exception {
DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
Map<String, String> smtConfig = new LinkedHashMap<>();
decodeLogicalDecodingMessageContent.configure(smtConfig);
Configuration.Builder configBuilder = TestHelper.defaultConfig();
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
// emit non transactional logical decoding message
TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '');");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("message"));
assertThat(recordsForTopic).hasSize(1);
Exception exception = assertThrows(DataException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0)));
assertThat(exception.getMessage()).isEqualTo("Conversion of logical decoding message content failed");
decodeLogicalDecodingMessageContent.close();
stopConnector();
}
@Test
@FixFor("DBZ-8103")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmptyContent() throws Exception {
DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
Map<String, String> smtConfig = new LinkedHashMap<>();
decodeLogicalDecodingMessageContent.configure(smtConfig);
Configuration.Builder configBuilder = TestHelper.defaultConfig();
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
// emit non transactional logical decoding message
TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '{}');");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("message"));
assertThat(recordsForTopic).hasSize(1);
SourceRecord transformedRecord = decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0));
assertThat(transformedRecord).isNotNull();
Struct value = (Struct) transformedRecord.value();
String op = value.getString(Envelope.FieldName.OPERATION);
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
assertNull(source.getInt64(SourceInfo.TXID_KEY));
assertNotNull(source.getInt64(SourceInfo.TIMESTAMP_KEY));
assertNotNull(source.getInt64(SourceInfo.LSN_KEY));
assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY));
assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY));
assertEquals(Envelope.Operation.CREATE.code(), op);
assertEquals(0, after.schema().fields().size());
List<Field> recordValueSchemaFields = value.schema().fields();
assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message")));
decodeLogicalDecodingMessageContent.close();
stopConnector();
}
@Test
@FixFor("DBZ-8103")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithContent() throws Exception {
DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
Map<String, String> smtConfig = new LinkedHashMap<>();
decodeLogicalDecodingMessageContent.configure(smtConfig);
Configuration.Builder configBuilder = TestHelper.defaultConfig();
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
// emit transactional logical decoding message
TestHelper.execute("SELECT pg_logical_emit_message(true, 'foo', '{\"bar\": \"baz\", \"qux\": 9703}');");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("message"));
assertThat(recordsForTopic).hasSize(1);
SourceRecord transformedRecord = decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0));
assertThat(transformedRecord).isNotNull();
Struct value = (Struct) transformedRecord.value();
String op = value.getString(Envelope.FieldName.OPERATION);
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
assertNotNull(source.getInt64(SourceInfo.TXID_KEY));
assertNotNull(source.getInt64(SourceInfo.TIMESTAMP_KEY));
assertNotNull(source.getInt64(SourceInfo.LSN_KEY));
assertEquals("", source.getString(SourceInfo.TABLE_NAME_KEY));
assertEquals("", source.getString(SourceInfo.SCHEMA_NAME_KEY));
assertEquals(Envelope.Operation.CREATE.code(), op);
assertEquals(2, after.schema().fields().size());
assertEquals("baz", after.get("bar"));
assertEquals(9703, after.get("qux"));
List<Field> recordValueSchemaFields = value.schema().fields();
assertTrue(recordValueSchemaFields.stream().noneMatch(f -> f.name().equals("message")));
decodeLogicalDecodingMessageContent.close();
stopConnector();
}
}