From 6300f2972f5cb77bd44a62d05555b658b585fc4f Mon Sep 17 00:00:00 2001 From: Renato Mefi Date: Sun, 24 Mar 2019 20:33:30 +0100 Subject: [PATCH] DBZ-1169 Integration tests for Outbox EventRouter Set the Snapshot Mode to NEVER provides more consistent tests --- .../postgresql/OutboxEventRouterIT.java | 107 +++++++++++++++++- 1 file changed, 102 insertions(+), 5 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java index 320ff38e0..19e5baa68 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java @@ -7,18 +7,28 @@ package io.debezium.connector.postgresql; import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; +import io.debezium.data.Uuid; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.transforms.outbox.EventRouter; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; -import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.INITIAL; import static io.debezium.connector.postgresql.TestHelper.topicName; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; import static org.fest.assertions.Assertions.assertThat; /** @@ -48,7 +58,8 @@ private static String createEventInsert( String eventType, String aggregateType, String aggregateId, - String payloadJson + String payloadJson, + String additional ) { return String.format("INSERT INTO outboxsmtit.outbox VALUES (" + "'%s'" + @@ -56,12 +67,14 @@ private static String createEventInsert( ", '%s'" + ", '%s'" + ", '%s'::jsonb" + + "%s" + ");", eventId, aggregateType, aggregateId, eventType, - payloadJson); + payloadJson, + additional); } @Before @@ -73,7 +86,7 @@ public void beforeEach() { TestHelper.execute(SETUP_OUTOBOX_TABLE); Configuration.Builder configBuilder = TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.SCHEMA_WHITELIST, "outboxsmtit") .with(PostgresConnectorConfig.TABLE_WHITELIST, "outboxsmtit\\.outbox"); @@ -96,7 +109,8 @@ public void shouldConsumeRecordsFromInsert() throws Exception { "UserCreated", "User", "10711fa5", - "{}" + "{}", + "" )); SourceRecords actualRecords = consumeRecordsByTopic(1); @@ -108,4 +122,87 @@ public void shouldConsumeRecordsFromInsert() throws Exception { assertThat(routedEvent).isNotNull(); assertThat(routedEvent.topic()).isEqualTo("outbox.event.user"); } + + @Test + public void shouldRespectJsonFormatAsString() throws Exception { + TestHelper.execute(createEventInsert( + UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"), + "UserCreated", + "User", + "7bdf2e9e", + "{\"email\": \"gh@mefi.in\"}", + "" + )); + + SourceRecords actualRecords = consumeRecordsByTopic(1); + assertThat(actualRecords.topics().size()).isEqualTo(1); + + SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0); + SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord); + + Struct valueStruct = requireStruct(routedEvent.value(), "test payload"); + JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload")); + assertThat(payload.get("email").getTextValue()).isEqualTo("gh@mefi.in"); + } + + @Test + public void shouldSupportAllFeatures() throws Exception { + outboxEventRouter = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put("table.field.schema.version", "version"); + config.put("table.field.event.timestamp", "createdat"); + config.put( + "table.fields.additional.placement", + "version:envelope:eventVersion," + + "aggregatetype:envelope:aggregateType," + + "somebooltype:envelope:someBoolType," + + "somebooltype:header" + ); + outboxEventRouter.configure(config); + + TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;"); + TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;"); + TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;"); + + TestHelper.execute(createEventInsert( + UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"), + "UserUpdated", + "UserEmail", + "7bdf2e9e", + "{\"email\": \"gh@mefi.in\"}", + ", 1, true, TIMESTAMP '2019-03-24 20:52:59'" + )); + + SourceRecords actualRecords = consumeRecordsByTopic(1); + assertThat(actualRecords.topics().size()).isEqualTo(1); + + SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0); + SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord); + + // Validate metadata + assertThat(eventRouted.valueSchema().version()).isEqualTo(1); + assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L); + assertThat(eventRouted.topic()).isEqualTo("outbox.event.useremail"); + + // Validate headers + Headers headers = eventRouted.headers(); + assertThat(headers.size()).isEqualTo(2); + Header headerId = headers.lastWithName("id"); + assertThat(headerId.schema()).isEqualTo(Uuid.schema()); + assertThat(headerId.value()).isEqualTo("f9171eb6-19f3-4579-9206-0e179d2ebad7"); + Header headerBool = headers.lastWithName("somebooltype"); + assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA); + assertThat(headerBool.value()).isEqualTo(true); + + // Validate Key + assertThat(eventRouted.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA); + assertThat(eventRouted.key()).isEqualTo("7bdf2e9e"); + + // Validate message body + Struct valueStruct = requireStruct(eventRouted.value(), "test envelope"); + assertThat(valueStruct.getString("eventType")).isEqualTo("UserUpdated"); + assertThat(valueStruct.getString("aggregateType")).isEqualTo("UserEmail"); + assertThat(valueStruct.getInt32("eventVersion")).isEqualTo(1); + assertThat(valueStruct.getBoolean("someBoolType")).isEqualTo(true); + } }