DBZ-1169 Integration tests for Outbox EventRouter

Set the Snapshot Mode to NEVER provides more consistent tests
This commit is contained in:
Renato Mefi 2019-03-24 20:33:30 +01:00 committed by Jiri Pechanec
parent 0847870d21
commit 6300f2972f

View File

@ -7,18 +7,28 @@
package io.debezium.connector.postgresql;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.data.Uuid;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.transforms.outbox.EventRouter;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.INITIAL;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.fest.assertions.Assertions.assertThat;
/**
@ -48,7 +58,8 @@ private static String createEventInsert(
String eventType,
String aggregateType,
String aggregateId,
String payloadJson
String payloadJson,
String additional
) {
return String.format("INSERT INTO outboxsmtit.outbox VALUES (" +
"'%s'" +
@ -56,12 +67,14 @@ private static String createEventInsert(
", '%s'" +
", '%s'" +
", '%s'::jsonb" +
"%s" +
");",
eventId,
aggregateType,
aggregateId,
eventType,
payloadJson);
payloadJson,
additional);
}
@Before
@ -73,7 +86,7 @@ public void beforeEach() {
TestHelper.execute(SETUP_OUTOBOX_TABLE);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.SCHEMA_WHITELIST, "outboxsmtit")
.with(PostgresConnectorConfig.TABLE_WHITELIST, "outboxsmtit\\.outbox");
@ -96,7 +109,8 @@ public void shouldConsumeRecordsFromInsert() throws Exception {
"UserCreated",
"User",
"10711fa5",
"{}"
"{}",
""
));
SourceRecords actualRecords = consumeRecordsByTopic(1);
@ -108,4 +122,87 @@ public void shouldConsumeRecordsFromInsert() throws Exception {
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.user");
}
@Test
public void shouldRespectJsonFormatAsString() throws Exception {
TestHelper.execute(createEventInsert(
UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"),
"UserCreated",
"User",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
""
));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email").getTextValue()).isEqualTo("gh@mefi.in");
}
@Test
public void shouldSupportAllFeatures() throws Exception {
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("table.field.schema.version", "version");
config.put("table.field.event.timestamp", "createdat");
config.put(
"table.fields.additional.placement",
"version:envelope:eventVersion," +
"aggregatetype:envelope:aggregateType," +
"somebooltype:envelope:someBoolType," +
"somebooltype:header"
);
outboxEventRouter.configure(config);
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;");
TestHelper.execute(createEventInsert(
UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"),
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
", 1, true, TIMESTAMP '2019-03-24 20:52:59'"
));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord);
// Validate metadata
assertThat(eventRouted.valueSchema().version()).isEqualTo(1);
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
assertThat(eventRouted.topic()).isEqualTo("outbox.event.useremail");
// Validate headers
Headers headers = eventRouted.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(Uuid.schema());
assertThat(headerId.value()).isEqualTo("f9171eb6-19f3-4579-9206-0e179d2ebad7");
Header headerBool = headers.lastWithName("somebooltype");
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(eventRouted.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(eventRouted.key()).isEqualTo("7bdf2e9e");
// Validate message body
Struct valueStruct = requireStruct(eventRouted.value(), "test envelope");
assertThat(valueStruct.getString("eventType")).isEqualTo("UserUpdated");
assertThat(valueStruct.getString("aggregateType")).isEqualTo("UserEmail");
assertThat(valueStruct.getInt32("eventVersion")).isEqualTo(1);
assertThat(valueStruct.getBoolean("someBoolType")).isEqualTo(true);
}
}