DBZ-3940 Unify EventRouter tests for connectors

This commit is contained in:
Chris Cranford 2021-12-11 01:06:14 -05:00 committed by Gunnar Morling
parent f99ad7512f
commit 63731a2a09
3 changed files with 758 additions and 915 deletions

View File

@ -5,36 +5,19 @@
*/
package io.debezium.connector.oracle;
import static io.debezium.data.VerifyRecord.assertConnectSchemasAreEqual;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.fest.assertions.Assertions.assertThat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.connector.oracle.converters.NumberOneToBooleanConverter;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.transforms.outbox.AbstractEventRouterTest;
import io.debezium.transforms.outbox.EventRouter;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import io.debezium.util.Testing;
/**
@ -42,7 +25,7 @@
*
* @author Chris Cranford
*/
public class OutboxEventRouterIT extends AbstractConnectorTest {
public class OutboxEventRouterIT extends AbstractEventRouterTest<OracleConnector> {
private static final String SETUP_OUTBOX_TABLE = "CREATE TABLE debezium.outbox (" +
"id varchar2(64) not null primary key, " +
@ -51,456 +34,43 @@ public class OutboxEventRouterIT extends AbstractConnectorTest {
"type varchar2(255) not null, " +
"payload varchar2(4000))";
private EventRouter<SourceRecord> outboxEventRouter;
private OracleConnection connection;
@Before
public void before() throws Exception {
@Override
public void beforeEach() throws Exception {
connection = TestHelper.testConnection();
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
TestHelper.dropTable(connection, "debezium.outbox");
connection.execute(SETUP_OUTBOX_TABLE);
TestHelper.streamTable(connection, "debezium.outbox");
outboxEventRouter = new EventRouter<>();
outboxEventRouter.configure(Collections.emptyMap()); // default values
super.beforeEach();
}
@After
public void after() throws Exception {
stopConnector();
assertNoRecordsToConsume();
outboxEventRouter.close();
@Override
public void afterEach() throws Exception {
super.afterEach();
if (connection != null && connection.isConnected()) {
TestHelper.dropTable(connection, "debezium.outbox");
TestHelper.dropTable(connection, tableName());
connection.close();
}
}
@Test
@FixFor({"DBZ-1169", "DBZ-3940"})
public void shouldConsumeRecordsFromInsert() throws Exception {
startConnectorWithInitialSnapshotRecord();
connection.execute(createEventInsert(
"59a42efd-b015-44a9-9dde-cb36d9002425",
"UserCreated",
"User",
"10711fa5",
"{}",
""));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
assertThat(routedEvent.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("10711fa5");
assertThat(routedEvent.value()).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) routedEvent.value());
assertThat(payload.get("email")).isNull();
@Override
protected Class<OracleConnector> getConnectorClass() {
return OracleConnector.class;
}
@Test
@FixFor({"DBZ-1385", "DBZ-3940"})
public void shouldSendEventTypeAsHeader() throws Exception {
startConnectorWithInitialSnapshotRecord();
connection.execute(createEventInsert(
"59a42efd-b015-44a9-9dde-cb36d9002425",
"UserCreated",
"User",
"10711fa5",
"{\"email\": \"gh@mefi.in\"}",
""));
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "TYPE:header:eventType");
outboxEventRouter.configure(config);
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Object value = routedEvent.value();
assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
assertThat(value).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) value);
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Test
@FixFor({"DBZ-2014", "DBZ-3940"})
public void shouldSendEventTypeAsValue() throws Exception {
startConnectorWithInitialSnapshotRecord();
connection.execute(createEventInsert(
"d4da2428-8b19-11ea-bc55-0242ac130003",
"UserCreated",
"User",
"9948fcad",
"{\"email\": \"gh@mefi.in\"}",
""));
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "TYPE:envelope:eventType");
outboxEventRouter.configure(config);
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(valueStruct.getString("eventType")).isEqualTo("UserCreated");
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
@FixFor({"DBZ-2014", "DBZ-3940"})
public void shouldRespectJsonFormatAsString() throws Exception {
startConnectorWithInitialSnapshotRecord();
connection.execute(createEventInsert(
"f9171eb6-19f3-4579-9206-0e179d2ebad7",
"UserCreated",
"User",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
""));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent.value()).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) routedEvent.value());
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
@FixFor({"DBZ-1169", "DBZ-3940"})
public void shouldSupportAllFeatures() throws Exception {
startConnectorWithNoSnapshot();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "VERSION");
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "CREATEDAT");
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(),
"VERSION:envelope:eventVersion," +
"AGGREGATETYPE:envelope:aggregateType," +
"SOMEBOOLTYPE:envelope:someBoolType," +
"SOMEBOOLTYPE:header," +
"IS_DELETED:envelope:deleted");
outboxEventRouter.configure(config);
connection.execute("ALTER TABLE debezium.outbox add version numeric(9,0) not null");
connection.execute("ALTER TABLE debezium.outbox add somebooltype numeric(1,0) not null");
connection.execute("ALTER TABLE debezium.outbox add createdat timestamp not null");
connection.execute("ALTER TABLE debezium.outbox add is_deleted numeric(1,0) default 0 not null");
connection.execute(createEventInsert(
"f9171eb6-19f3-4579-9206-0e179d2ebad7",
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
", 1, 1, TO_TIMESTAMP('2019-03-24 20:52:59', 'YYYY-MM-DD HH24:MI:SS'), 0"));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
Schema expectedSchema = SchemaBuilder.struct()
.version(1)
.name("server1.DEBEZIUM.OUTBOX.UserEmail.Value")
.field("payload", Schema.OPTIONAL_STRING_SCHEMA)
.field("eventVersion", Schema.INT32_SCHEMA)
.field("aggregateType", Schema.STRING_SCHEMA)
.field("someBoolType", Schema.BOOLEAN_SCHEMA)
.field("deleted", SchemaBuilder.bool().defaultValue(false).build())
.build();
assertConnectSchemasAreEqual(null, routedEvent.valueSchema(), expectedSchema);
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers).hasSize(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(headerId.value()).isEqualTo("f9171eb6-19f3-4579-9206-0e179d2ebad7");
Header headerBool = headers.lastWithName("SOMEBOOLTYPE");
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("7bdf2e9e");
// Validate message body
Struct valueStruct = requireStruct(routedEvent.value(), "test envelope");
assertThat(valueStruct.getString("aggregateType")).isEqualTo("UserEmail");
assertThat(valueStruct.getInt32("eventVersion")).isEqualTo(1);
assertThat(valueStruct.get("someBoolType")).isEqualTo(true);
assertThat(valueStruct.get("deleted")).isEqualTo(false);
}
@Test
@FixFor({"DBZ-1707", "DBZ-3940"})
public void shouldConvertMicroSecondsTimestampToMilliSeconds() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "CREATEDAT");
outboxEventRouter.configure(config);
connection.execute("ALTER TABLE debezium.outbox add createdat timestamp not null");
connection.execute(createEventInsert(
"f9171eb6-19f3-4579-9206-0e179d2ebad7",
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
", TO_TIMESTAMP('2019-03-24 20:52:59', 'YYYY-MM-DD HH24:MI:SS')"));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// expecting microseconds value emitted for TIMESTAMP column without width to be
// converted to milliseconds, as that's the standard semantics of that property
// in Kafka
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
}
@Test
@FixFor({"DBZ-1320", "DBZ-3940"})
public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "VERSION");
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "CREATEDAT");
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(),
"VERSION:envelope:eventVersion," +
"AGGREGATETYPE:envelope:aggregateType," +
"SOMEBOOLTYPE:envelope:someBoolType," +
"SOMEBOOLTYPE:header," +
"IS_DELETED:envelope:deleted");
outboxEventRouter.configure(config);
connection.execute("ALTER TABLE debezium.outbox add version numeric(9,0) not null");
connection.execute("ALTER TABLE debezium.outbox add somebooltype numeric(1,0) not null");
connection.execute("ALTER TABLE debezium.outbox add createdat timestamp not null");
connection.execute("ALTER TABLE debezium.outbox add is_deleted numeric(1,0) default 0 not null");
connection.execute(createEventInsert(
"a9d76f78-bda6-48d3-97ed-13a146163218",
"UserUpdated",
"UserEmail",
"a9d76f78",
null,
", 1, 1, TO_TIMESTAMP('2019-03-24 20:52:59', 'YYYY-MM-DD HH24:MI:SS'), 1"));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.topics()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
assertThat(routedEvent.valueSchema()).isNotNull();
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(headerId.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
Header headerBool = headers.lastWithName("SOMEBOOLTYPE");
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("a9d76f78");
// Validate message body
System.out.println(routedEvent);
assertThat(routedEvent.value()).isNotNull();
assertThat(((Struct) routedEvent.value()).get("payload")).isNull();
}
@Test
@FixFor({"DBZ-1320", "DBZ-3940"})
public void shouldProduceTombstoneEventForNullPayload() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "VERSION");
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "CREATEDAT");
config.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(),
"VERSION:envelope:eventVersion," +
"AGGREGATETYPE:envelope:aggregateType," +
"SOMEBOOLTYPE:envelope:someBoolType," +
"SOMEBOOLTYPE:header," +
"IS_DELETED:envelope:deleted");
outboxEventRouter.configure(config);
connection.execute("ALTER TABLE debezium.outbox add version numeric(9,0) not null");
connection.execute("ALTER TABLE debezium.outbox add somebooltype numeric(1,0) not null");
connection.execute("ALTER TABLE debezium.outbox add createdat timestamp not null");
connection.execute("ALTER TABLE debezium.outbox add is_deleted numeric(1,0) default 0 not null");
connection.execute(createEventInsert(
"a9d76f78-bda6-48d3-97ed-13a146163218",
"UserUpdated",
"UserEmail",
"a9d76f78",
null,
", 1, 1, TO_TIMESTAMP('2019-03-24 20:52:59', 'YYYY-MM-DD HH24:MI:SS'), 1"));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.topics()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
assertThat(routedEvent.valueSchema()).isNull();
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(headerId.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
Header headerBool = headers.lastWithName("SOMEBOOLTYPE");
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("a9d76f78");
// Validate message body
assertThat(routedEvent.value()).isNull();
}
@Test
@FixFor({"DBZ-1320", "DBZ-3940"})
public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
outboxEventRouter.configure(config);
connection.execute("ALTER TABLE debezium.outbox modify (payload varchar2(1000))");
connection.execute(createEventInsert(
"a9d76f78-bda6-48d3-97ed-13a146163218",
"UserUpdated",
"UserEmail",
"a9d76f78",
"",
null));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.topics()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName("OUTBOX")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
assertThat(routedEvent.valueSchema()).isNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(1);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(headerId.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("a9d76f78");
// Validate message body
assertThat(routedEvent.value()).isNull();
}
private void startConnectorWithInitialSnapshotRecord() throws Exception {
connection.execute(createEventInsert(
"70f52ae3-f671-4bac-ae62-1b9be6e73700",
"UserCreated",
"User",
"10711faf",
"{}",
""));
Configuration.Builder configBuilder = getConfigurationBuilder(SnapshotMode.INITIAL);
start(OracleConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
SourceRecords snapshotRecords = consumeRecordsByTopic(1);
assertThat(snapshotRecords.allRecordsInOrder()).hasSize(1);
List<SourceRecord> records = snapshotRecords.recordsForTopic(topicName("OUTBOX"));
assertThat(records).hasSize(1);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
private void startConnectorWithNoSnapshot() throws Exception {
Configuration.Builder configBuilder = getConfigurationBuilder(SnapshotMode.SCHEMA_ONLY);
start(OracleConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
assertNoRecordsToConsume();
}
private Configuration.Builder getConfigurationBuilder(SnapshotMode snapshotMode) {
@Override
protected Configuration.Builder getConfigurationBuilder(boolean initialSnapshot) {
final SnapshotMode snapshotMode = initialSnapshot ? SnapshotMode.INITIAL : SnapshotMode.SCHEMA_ONLY;
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, snapshotMode.getValue())
// this allows numeric(1) to be simulated as boolean types like other databases
@ -509,19 +79,48 @@ private Configuration.Builder getConfigurationBuilder(SnapshotMode snapshotMode)
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.OUTBOX");
}
private String topicName(String tableName) {
return TestHelper.SERVER_NAME + ".DEBEZIUM." + tableName;
@Override
protected String getSchemaNamePrefix() {
return "server1.DEBEZIUM.OUTBOX.";
}
private String createEventInsert(String id, String type, String aggregateType, String aggregateId, String payload, String additional) {
@Override
protected Schema getPayloadSchema() {
return Schema.OPTIONAL_STRING_SCHEMA;
}
@Override
protected String tableName() {
return "debezium.outbox";
}
@Override
protected String topicName() {
return TestHelper.SERVER_NAME + ".DEBEZIUM.OUTBOX";
}
@Override
protected void createTable() throws Exception {
TestHelper.dropTable(connection, tableName());
connection.execute(SETUP_OUTBOX_TABLE);
TestHelper.streamTable(connection, tableName());
}
@Override
protected String createInsert(String eventId,
String eventType,
String aggregateType,
String aggregateId,
String payloadJson,
String additional) {
StringBuilder insert = new StringBuilder();
insert.append("INSERT INTO debezium.outbox VALUES (");
insert.append("'").append(id).append("', ");
insert.append("'").append(eventId).append("', ");
insert.append("'").append(aggregateType).append("', ");
insert.append("'").append(aggregateId).append("', ");
insert.append("'").append(type).append("', ");
if (payload != null) {
insert.append("'").append(payload).append("'");
insert.append("'").append(eventType).append("', ");
if (payloadJson != null) {
insert.append("'").append(payloadJson).append("'");
}
else {
insert.append("NULL");
@ -532,4 +131,72 @@ private String createEventInsert(String id, String type, String aggregateType, S
insert.append(")");
return insert.toString();
}
@Override
protected void waitForSnapshotCompleted() throws InterruptedException {
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
@Override
protected void waitForStreamingStarted() throws InterruptedException {
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
@Override
protected void alterTableWithExtra4Fields() throws Exception {
connection.execute("ALTER TABLE debezium.outbox add version numeric(9,0) not null");
connection.execute("ALTER TABLE debezium.outbox add somebooltype numeric(1,0) not null");
connection.execute("ALTER TABLE debezium.outbox add createdat timestamp not null");
connection.execute("ALTER TABLE debezium.outbox add is_deleted numeric(1,0) default 0");
}
@Override
protected void alterTableWithTimestampField() throws Exception {
connection.execute("ALTER TABLE debezium.outbox add createdat timestamp not null");
}
@Override
protected void alterTableModifyPayload() throws Exception {
connection.execute("ALTER TABLE debezium.outbox modify (payload varchar2(1000))");
}
@Override
protected String getAdditionalFieldValues(boolean deleted) {
return ", 1, 1, TO_TIMESTAMP('2019-03-24 20:52:59', 'YYYY-MM-DD HH24:MI:SS'), " + (deleted ? "1" : "0");
}
@Override
protected String getAdditionalFieldValuesTimestampOnly() {
return ", TO_TIMESTAMP('2019-03-24 20:52:59', 'YYYY-MM-DD HH24:MI:SS')";
}
@Override
protected String getFieldEventType() {
return super.getFieldEventType().toUpperCase();
}
@Override
protected String getFieldSchemaVersion() {
return super.getFieldSchemaVersion().toUpperCase();
}
@Override
protected String getFieldEventTimestamp() {
return super.getFieldEventTimestamp().toUpperCase();
}
@Override
protected String getFieldAggregateType() {
return super.getFieldAggregateType().toUpperCase();
}
@Override
protected String getSomeBoolType() {
return super.getSomeBoolType().toUpperCase();
}
@Override
protected String getIsDeleted() {
return super.getIsDeleted().toUpperCase();
}
}

View File

@ -6,49 +6,29 @@
package io.debezium.connector.postgresql;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static io.debezium.data.VerifyRecord.assertConnectSchemasAreEqual;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.fest.assertions.Assertions.assertThat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.transforms.outbox.EventRouter;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.transforms.outbox.AbstractEventRouterTest;
/**
* Integration test for {@link io.debezium.transforms.outbox.EventRouter} with {@link PostgresConnector}
*
* @author Renato Mefi (gh@mefi.in)
*/
public class OutboxEventRouterIT extends AbstractConnectorTest {
public class OutboxEventRouterIT extends AbstractEventRouterTest<PostgresConnector> {
private static final String SETUP_OUTBOX_SCHEMA = "DROP SCHEMA IF EXISTS outboxsmtit CASCADE;" +
"CREATE SCHEMA outboxsmtit;";
private static final String SETUP_OUTOBOX_TABLE = "CREATE TABLE outboxsmtit.outbox " +
private static final String SETUP_OUTBOX_TABLE = "CREATE TABLE outboxsmtit.outbox " +
"(" +
" id uuid not null" +
" constraint outbox_pk primary key," +
@ -58,18 +38,75 @@ public class OutboxEventRouterIT extends AbstractConnectorTest {
" payload jsonb" +
");";
private EventRouter<SourceRecord> outboxEventRouter;
@Before
@Override
public void beforeEach() throws Exception {
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
super.beforeEach();
}
private static String createEventInsert(
UUID eventId,
String eventType,
String aggregateType,
String aggregateId,
String payloadJson,
String additional) {
@Override
protected Class<PostgresConnector> getConnectorClass() {
return PostgresConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return TestHelper.create();
}
@Override
protected Configuration.Builder getConfigurationBuilder(boolean initialSnapshot) {
SnapshotMode snapshotMode = initialSnapshot ? SnapshotMode.INITIAL : SnapshotMode.NEVER;
return TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, snapshotMode.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "outboxsmtit")
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "outboxsmtit\\.outbox");
}
@Override
protected String getSchemaNamePrefix() {
return "test_server.outboxsmtit.outbox.";
}
@Override
protected Schema getIdSchema() {
return Uuid.builder().schema();
}
@Override
protected Schema getPayloadSchema() {
return Json.builder().optional().build();
}
@Override
protected String tableName() {
return "outboxsmtit.outbox";
}
@Override
protected String topicName() {
return TestHelper.topicName("outboxsmtit.outbox");
}
@Override
protected void createTable() throws Exception {
TestHelper.execute(SETUP_OUTBOX_SCHEMA);
TestHelper.execute(SETUP_OUTBOX_TABLE);
}
@Override
protected String createInsert(String eventId,
String eventType,
String aggregateType,
String aggregateId,
String payloadJson,
String additional) {
StringBuilder insert = new StringBuilder();
insert.append("INSERT INTO outboxsmtit.outbox VALUES (");
insert.append("'").append(eventId).append("'");
insert.append("'").append(UUID.fromString(eventId)).append("'");
insert.append(", '").append(aggregateType).append("'");
insert.append(", '").append(aggregateId).append("'");
insert.append(", '").append(eventType).append("'");
@ -92,449 +129,44 @@ else if (payloadJson.isEmpty()) {
return insert.toString();
}
@Before
public void beforeEach() throws InterruptedException {
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
outboxEventRouter = new EventRouter<>();
outboxEventRouter.configure(Collections.emptyMap());
TestHelper.execute(SETUP_OUTBOX_SCHEMA);
TestHelper.execute(SETUP_OUTOBOX_TABLE);
@Override
protected void waitForSnapshotCompleted() throws InterruptedException {
waitForSnapshotToBeCompleted("postgres", TestHelper.TEST_SERVER);
}
@After
public void afterEach() {
stopConnector();
assertNoRecordsToConsume();
outboxEventRouter.close();
@Override
protected void waitForStreamingStarted() throws InterruptedException {
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
}
@Test
public void shouldConsumeRecordsFromInsert() throws Exception {
startConnectorWithInitialSnapshotRecord();
TestHelper.execute(createEventInsert(
UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"),
"UserCreated",
"User",
"10711fa5",
"{}",
""));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
assertThat(routedEvent.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("10711fa5");
assertThat(routedEvent.value()).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) routedEvent.value());
assertThat(payload.get("email")).isEqualTo(null);
}
@Test
public void shouldSendEventTypeAsHeader() throws Exception {
startConnectorWithInitialSnapshotRecord();
TestHelper.execute(createEventInsert(
UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"),
"UserCreated",
"User",
"10711fa5",
"{\"email\": \"gh@mefi.in\"}",
""));
final Map<String, String> config = new HashMap<>();
config.put(
"table.fields.additional.placement",
"type:header:eventType");
outboxEventRouter.configure(config);
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Object value = routedEvent.value();
assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
assertThat(value).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) value);
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
@FixFor("DBZ-2014")
public void shouldSendEventTypeAsValue() throws Exception {
startConnectorWithInitialSnapshotRecord();
TestHelper.execute(createEventInsert(
UUID.fromString("d4da2428-8b19-11ea-bc55-0242ac130003"),
"UserCreated",
"User",
"9948fcad",
"{\"email\": \"gh@mefi.in\"}",
""));
final Map<String, String> config = new HashMap<>();
config.put(
"table.fields.additional.placement",
"type:envelope:eventType");
outboxEventRouter.configure(config);
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(valueStruct.getString("eventType")).isEqualTo("UserCreated");
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
public void shouldRespectJsonFormatAsString() throws Exception {
startConnectorWithInitialSnapshotRecord();
TestHelper.execute(createEventInsert(
UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"),
"UserCreated",
"User",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
""));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent.value()).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) routedEvent.value());
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
public void shouldSupportAllFeatures() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("table.field.event.schema.version", "version");
config.put("table.field.event.timestamp", "createdat");
config.put(
"table.fields.additional.placement",
"version:envelope:eventVersion," +
"aggregatetype:envelope:aggregateType," +
"somebooltype:envelope:someBoolType," +
"somebooltype:header," +
"is_deleted:envelope:deleted");
outboxEventRouter.configure(config);
@Override
protected void alterTableWithExtra4Fields() throws Exception {
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add is_deleted boolean default false;");
TestHelper.execute(createEventInsert(
UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"),
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
", 1, true, TIMESTAMP(3) '2019-03-24 20:52:59'"));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord);
// Validate metadata
Schema expectedSchema = SchemaBuilder.struct()
.version(1)
.name("test_server.outboxsmtit.outbox.UserEmail.Value")
.field("payload", Json.builder().optional().build())
.field("eventVersion", Schema.INT32_SCHEMA)
.field("aggregateType", Schema.STRING_SCHEMA)
.field("someBoolType", Schema.BOOLEAN_SCHEMA)
.field("deleted", SchemaBuilder.bool().optional().defaultValue(false).build())
.build();
assertConnectSchemasAreEqual(null, eventRouted.valueSchema(), expectedSchema);
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = eventRouted.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(Uuid.builder().build());
assertThat(headerId.value()).isEqualTo("f9171eb6-19f3-4579-9206-0e179d2ebad7");
Header headerBool = headers.lastWithName("somebooltype");
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(eventRouted.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(eventRouted.key()).isEqualTo("7bdf2e9e");
// Validate message body
Struct valueStruct = requireStruct(eventRouted.value(), "test envelope");
assertThat(valueStruct.getString("aggregateType")).isEqualTo("UserEmail");
assertThat(valueStruct.getInt32("eventVersion")).isEqualTo(1);
assertThat(valueStruct.getBoolean("someBoolType")).isEqualTo(true);
assertThat(valueStruct.getBoolean("deleted")).isEqualTo(false);
}
@Test
@FixFor("DBZ-1707")
public void shouldConvertMicroSecondsTimestampToMilliSeconds() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("table.field.event.timestamp", "createdat");
outboxEventRouter.configure(config);
@Override
protected void alterTableWithTimestampField() throws Exception {
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;");
TestHelper.execute(createEventInsert(
UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"),
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
", TIMESTAMP '2019-03-24 20:52:59'"));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord);
// expecting microseconds value emitted for TIMESTAMP column without width to be
// converted to milliseconds, as that's the standard semantics of that property
// in Kafka
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
}
@Test
@FixFor("DBZ-1320")
public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("table.field.event.schema.version", "version");
config.put("table.field.event.timestamp", "createdat");
config.put(
"table.fields.additional.placement",
"version:envelope:eventVersion," +
"aggregatetype:envelope:aggregateType," +
"somebooltype:envelope:someBoolType," +
"somebooltype:header," +
"is_deleted:envelope:deleted");
outboxEventRouter.configure(config);
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add is_deleted boolean not null default false;");
TestHelper.execute(createEventInsert(
UUID.fromString("a9d76f78-bda6-48d3-97ed-13a146163218"),
"UserUpdated",
"UserEmail",
"a9d76f78",
null,
", 1, true, TIMESTAMP '2019-03-24 20:52:59', true"));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord);
// Validate metadata
assertThat(eventRouted.valueSchema()).isNotNull();
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = eventRouted.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(Uuid.schema());
assertThat(headerId.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
Header headerBool = headers.lastWithName("somebooltype");
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(eventRouted.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(eventRouted.key()).isEqualTo("a9d76f78");
// Validate message body
assertThat(eventRouted.value()).isNotNull();
assertThat(((Struct) eventRouted.value()).get("payload")).isNull();
}
@Test
@FixFor("DBZ-1320")
public void shouldProduceTombstoneEventForNullPayload() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("table.field.event.schema.version", "version");
config.put("table.field.event.timestamp", "createdat");
config.put("route.tombstone.on.empty.payload", "true");
config.put(
"table.fields.additional.placement",
"version:envelope:eventVersion," +
"aggregatetype:envelope:aggregateType," +
"somebooltype:envelope:someBoolType," +
"somebooltype:header," +
"is_deleted:envelope:deleted");
outboxEventRouter.configure(config);
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;");
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add is_deleted boolean not null default false;");
TestHelper.execute(createEventInsert(
UUID.fromString("a9d76f78-bda6-48d3-97ed-13a146163218"),
"UserUpdated",
"UserEmail",
"a9d76f78",
null,
", 1, true, TIMESTAMP '2019-03-24 20:52:59', true"));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord);
// Validate metadata
assertThat(eventRouted.valueSchema()).isNull();
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = eventRouted.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(Uuid.schema());
assertThat(headerId.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
Header headerBool = headers.lastWithName("somebooltype");
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(eventRouted.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(eventRouted.key()).isEqualTo("a9d76f78");
// Validate message body
assertThat(eventRouted.value()).isNull();
}
@Test
@FixFor("DBZ-1320")
public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("route.tombstone.on.empty.payload", "true");
outboxEventRouter.configure(config);
@Override
protected void alterTableModifyPayload() throws Exception {
TestHelper.execute("ALTER TABLE outboxsmtit.outbox ALTER COLUMN payload SET DATA TYPE VARCHAR(1000);");
TestHelper.execute(createEventInsert(
UUID.fromString("a9d76f78-bda6-48d3-97ed-13a146163218"),
"UserUpdated",
"UserEmail",
"a9d76f78",
"",
null));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord);
// Validate metadata
assertThat(eventRouted.valueSchema()).isNull();
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = eventRouted.headers();
assertThat(headers.size()).isEqualTo(1);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(Uuid.schema());
assertThat(headerId.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
// Validate Key
assertThat(eventRouted.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(eventRouted.key()).isEqualTo("a9d76f78");
// Validate message body
assertThat(eventRouted.value()).isNull();
}
private void startConnectorWithInitialSnapshotRecord() throws Exception {
TestHelper.execute(createEventInsert(
UUID.fromString("70f52ae3-f671-4bac-ae62-1b9be6e73700"),
"UserCreated",
"User",
"10711faf",
"{}",
""));
Configuration.Builder configBuilder = getConfigurationBuilder(SnapshotMode.INITIAL);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
SourceRecords snapshotRecords = consumeRecordsByTopic(1);
assertThat(snapshotRecords.allRecordsInOrder().size()).isEqualTo(1);
List<SourceRecord> recordsFromOutbox = snapshotRecords.recordsForTopic(topicName("outboxsmtit.outbox"));
assertThat(recordsFromOutbox.size()).isEqualTo(1);
@Override
protected String getAdditionalFieldValues(boolean deleted) {
if (deleted) {
return ", 1, true, TIMESTAMP(3) '2019-03-24 20:52:59', true";
}
return ", 1, true, TIMESTAMP(3) '2019-03-24 20:52:59'";
}
private void startConnectorWithNoSnapshot() throws InterruptedException {
Configuration.Builder configBuilder = getConfigurationBuilder(SnapshotMode.NEVER);
start(PostgresConnector.class, configBuilder.build());
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
}
private static Configuration.Builder getConfigurationBuilder(SnapshotMode snapshotMode) {
return TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, snapshotMode.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "outboxsmtit")
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "outboxsmtit\\.outbox");
@Override
protected String getAdditionalFieldValuesTimestampOnly() {
return ", TIMESTAMP '2019-03-24 20:52:59'";
}
}

View File

@ -0,0 +1,544 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.outbox;
import static io.debezium.data.VerifyRecord.assertConnectSchemasAreEqual;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.fest.assertions.Assertions.assertThat;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
/**
* A unified test of all {@link EventRouter} behavior which all connectors should extend.
*
* @author Chris Cranford
*/
public abstract class AbstractEventRouterTest<T extends SourceConnector> extends AbstractConnectorTest {
protected EventRouter<SourceRecord> outboxEventRouter;
protected abstract Class<T> getConnectorClass();
protected abstract JdbcConnection databaseConnection();
protected abstract Configuration.Builder getConfigurationBuilder(boolean initialSnapshot);
protected abstract String topicName();
protected abstract String tableName();
protected abstract String getSchemaNamePrefix();
protected abstract Schema getPayloadSchema();
protected abstract void createTable() throws Exception;
protected abstract void alterTableWithExtra4Fields() throws Exception;
protected abstract void alterTableWithTimestampField() throws Exception;
protected abstract void alterTableModifyPayload() throws Exception;
protected abstract String getAdditionalFieldValues(boolean deleted);
protected abstract String getAdditionalFieldValuesTimestampOnly();
protected abstract String createInsert(String eventId, String eventType, String aggregateType,
String aggregateId, String payloadJson, String additional);
protected abstract void waitForSnapshotCompleted() throws InterruptedException;
protected abstract void waitForStreamingStarted() throws InterruptedException;
@Before
public void beforeEach() throws Exception {
createTable();
outboxEventRouter = new EventRouter<>();
outboxEventRouter.configure(Collections.emptyMap()); // configure with defaults
}
@After
public void afterEach() throws Exception {
stopConnector();
assertNoRecordsToConsume();
outboxEventRouter.close();
}
@Test
@FixFor({ "DBZ-1169", "DBZ-3940" })
public void shouldConsumeRecordsFromInsert() throws Exception {
startConnectorWithInitialSnapshotRecord();
databaseConnection().execute(createInsert(
"59a42efd-b015-44a9-9dde-cb36d9002425",
"UserCreated",
"User",
"10711fa5",
"{}",
""));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
assertThat(routedEvent.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("10711fa5");
assertThat(routedEvent.value()).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) routedEvent.value());
assertThat(payload.get("email")).isNull();
}
@Test
@FixFor({ "DBZ-1385", "DBZ-3940" })
public void shouldSendEventTypeAsHeader() throws Exception {
startConnectorWithInitialSnapshotRecord();
databaseConnection().execute(createInsert(
"59a42efd-b015-44a9-9dde-cb36d9002425",
"UserCreated",
"User",
"10711fa5",
"{\"email\": \"gh@mefi.in\"}",
""));
final Map<String, String> config = new HashMap<>();
final String placements = getFieldEventType() + ":header:eventType";
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), placements);
outboxEventRouter.configure(config);
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Object value = routedEvent.value();
assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
assertThat(value).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) value);
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
@FixFor({ "DBZ-2014", "DBZ-3940" })
public void shouldSendEventTypeAsValue() throws Exception {
startConnectorWithInitialSnapshotRecord();
databaseConnection().execute(createInsert(
"d4da2428-8b19-11ea-bc55-0242ac130003",
"UserCreated",
"User",
"9948fcad",
"{\"email\": \"gh@mefi.in\"}",
""));
final Map<String, String> config = new HashMap<>();
final String placements = getFieldEventType() + ":envelope:eventType";
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), placements);
outboxEventRouter.configure(config);
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(valueStruct.getString("eventType")).isEqualTo("UserCreated");
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
@FixFor({ "DBZ-2014", "DBZ-3940" })
public void shouldRespectJsonFormatAsString() throws Exception {
startConnectorWithInitialSnapshotRecord();
databaseConnection().execute(createInsert(
"f9171eb6-19f3-4579-9206-0e179d2ebad7",
"UserCreated",
"User",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
""));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
assertThat(routedEvent.value()).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) routedEvent.value());
assertThat(payload.get("email").asText()).isEqualTo("gh@mefi.in");
}
@Test
@FixFor({ "DBZ-1169", "DBZ-3940" })
public void shouldSupportAllFeatures() throws Exception {
startConnectorWithNoSnapshot();
final StringBuilder placements = new StringBuilder();
placements.append(envelope(getFieldSchemaVersion(), "eventVersion")).append(",");
placements.append(envelope(getFieldAggregateType(), "aggregateType")).append(",");
placements.append(envelope(getSomeBoolType(), "someBoolType")).append(",");
placements.append(header(getSomeBoolType(), null)).append(",");
placements.append(envelope(getIsDeleted(), "deleted"));
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), getFieldSchemaVersion());
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), placements.toString());
outboxEventRouter.configure(config);
alterTableWithExtra4Fields();
databaseConnection().execute(createInsert(
"f9171eb6-19f3-4579-9206-0e179d2ebad7",
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
getAdditionalFieldValues(false)));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
Schema expectedSchema = SchemaBuilder.struct()
.version(1)
.name(getSchemaNamePrefix() + "UserEmail.Value")
.field("payload", getPayloadSchema())
.field("eventVersion", Schema.INT32_SCHEMA)
.field("aggregateType", Schema.STRING_SCHEMA)
.field("someBoolType", Schema.BOOLEAN_SCHEMA)
.field("deleted", SchemaBuilder.bool().optional().defaultValue(false).build())
.build();
assertConnectSchemasAreEqual(null, routedEvent.valueSchema(), expectedSchema);
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers).hasSize(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("f9171eb6-19f3-4579-9206-0e179d2ebad7"));
Header headerBool = headers.lastWithName(getSomeBoolType());
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("7bdf2e9e");
// Validate message body
Struct valueStruct = requireStruct(routedEvent.value(), "test envelope");
assertThat(valueStruct.getString("aggregateType")).isEqualTo("UserEmail");
assertThat(valueStruct.getInt32("eventVersion")).isEqualTo(1);
assertThat(valueStruct.get("someBoolType")).isEqualTo(true);
assertThat(valueStruct.get("deleted")).isEqualTo(false);
}
@Test
@FixFor({ "DBZ-1707", "DBZ-3940" })
public void shouldConvertMicrosecondsTimestampToMilliseconds() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
outboxEventRouter.configure(config);
alterTableWithTimestampField();
databaseConnection().execute(createInsert(
"f9171eb6-19f3-4579-9206-0e179d2ebad7",
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
getAdditionalFieldValuesTimestampOnly()));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// expecting microseconds value emitted for TIMESTAMP column without width to be
// converted to milliseconds, as that's the standard semantics of that property
// in Kafka
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
}
@Test
@FixFor({ "DBZ-1320", "DBZ-3940" })
public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
startConnectorWithNoSnapshot();
final StringBuilder placements = new StringBuilder();
placements.append(envelope(getFieldSchemaVersion(), "eventVersion")).append(",");
placements.append(envelope(getFieldAggregateType(), "agregateType")).append(",");
placements.append(envelope(getSomeBoolType(), "someBoolType")).append(",");
placements.append(header(getSomeBoolType(), null)).append(",");
placements.append(envelope(getIsDeleted(), "deleted"));
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), getFieldSchemaVersion());
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), placements.toString());
outboxEventRouter.configure(config);
alterTableWithExtra4Fields();
databaseConnection().execute(createInsert(
"a9d76f78-bda6-48d3-97ed-13a146163218",
"UserUpdated",
"UserEmail",
"a9d76f78",
null,
getAdditionalFieldValues(true)));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.topics()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
assertThat(routedEvent.valueSchema()).isNotNull();
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
Header headerBool = headers.lastWithName(getSomeBoolType());
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("a9d76f78");
// Validate message body
System.out.println(routedEvent);
assertThat(routedEvent.value()).isNotNull();
assertThat(((Struct) routedEvent.value()).get("payload")).isNull();
}
@Test
@FixFor({ "DBZ-1320", "DBZ-3940" })
public void shouldProduceTombstoneEventForNullPayload() throws Exception {
startConnectorWithNoSnapshot();
final StringBuilder placements = new StringBuilder();
placements.append(envelope(getFieldSchemaVersion(), "eventVersion")).append(",");
placements.append(envelope(getFieldAggregateType(), "aggregateType")).append(",");
placements.append(envelope(getSomeBoolType(), "someBoolType")).append(",");
placements.append(header(getSomeBoolType(), null)).append(",");
placements.append(envelope(getIsDeleted(), "deleted"));
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), getFieldSchemaVersion());
config.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), getFieldEventTimestamp());
config.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), placements.toString());
outboxEventRouter.configure(config);
alterTableWithExtra4Fields();
databaseConnection().execute(createInsert(
"a9d76f78-bda6-48d3-97ed-13a146163218",
"UserUpdated",
"UserEmail",
"a9d76f78",
null,
getAdditionalFieldValues(true)));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.topics()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
assertThat(routedEvent.valueSchema()).isNull();
assertThat(routedEvent.timestamp()).isEqualTo(1553460779000L);
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(2);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
Header headerBool = headers.lastWithName(getSomeBoolType());
assertThat(headerBool.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
assertThat(headerBool.value()).isEqualTo(true);
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("a9d76f78");
// Validate message body
assertThat(routedEvent.value()).isNull();
}
@Test
@FixFor({ "DBZ-1320", "DBZ-3940" })
public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
outboxEventRouter.configure(config);
alterTableModifyPayload();
databaseConnection().execute(createInsert(
"a9d76f78-bda6-48d3-97ed-13a146163218",
"UserUpdated",
"UserEmail",
"a9d76f78",
"",
null));
SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.topics()).hasSize(1);
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(record);
// Validate metadata
assertThat(routedEvent.valueSchema()).isNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
Headers headers = routedEvent.headers();
assertThat(headers.size()).isEqualTo(1);
Header headerId = headers.lastWithName("id");
assertThat(headerId.schema()).isEqualTo(getIdSchema());
assertThat(headerId.value()).isEqualTo(getId("a9d76f78-bda6-48d3-97ed-13a146163218"));
// Validate Key
assertThat(routedEvent.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("a9d76f78");
// Validate message body
assertThat(routedEvent.value()).isNull();
}
protected String getFieldEventType() {
return EventRouterConfigDefinition.FIELD_EVENT_TYPE.defaultValueAsString();
}
protected String getFieldSchemaVersion() {
return "version";
}
protected String getFieldEventTimestamp() {
return "createdat";
}
protected String getFieldAggregateType() {
return "aggregatetype";
}
protected String getSomeBoolType() {
return "somebooltype";
}
protected String getIsDeleted() {
return "is_deleted";
}
protected Schema getIdSchema() {
return SchemaBuilder.STRING_SCHEMA;
}
protected Object getId(String idValue) {
return idValue;
}
private String envelope(String source, String destination) {
return source + ":envelope:" + destination;
}
private String header(String source, String destination) {
return source + ":header" + (destination != null && destination.length() > 0 ? ":" + destination : "");
}
private void startConnectorWithInitialSnapshotRecord() throws Exception {
doInsert(createInsert("70f52ae3-f671-4bac-ae62-1b9be6e73700", "UserCreated", "User", "10711faf", "{}", ""));
Configuration.Builder configBuilder = getConfigurationBuilder(true);
start(getConnectorClass(), configBuilder.build());
assertConnectorIsRunning();
waitForSnapshotCompleted();
SourceRecords snapshotRecords = consumeRecordsByTopic(1);
assertThat(snapshotRecords.allRecordsInOrder()).hasSize(1);
List<SourceRecord> records = snapshotRecords.recordsForTopic(topicName());
assertThat(records).hasSize(1);
}
private void startConnectorWithNoSnapshot() throws Exception {
Configuration.Builder configBuilder = getConfigurationBuilder(false);
start(getConnectorClass(), configBuilder.build());
assertConnectorIsRunning();
waitForStreamingStarted();
assertNoRecordsToConsume();
}
private void doInsert(String insertSql) throws SQLException {
databaseConnection().execute(insertSql);
}
}