DBZ-3642 CloudEventsConverter can retrieve metadata info from headers
This commit is contained in:
parent
fcbff074f4
commit
98c426dec4
@ -400,6 +400,7 @@ Renato Mefi
|
||||
René Kerner
|
||||
Robert Hana
|
||||
Roman Kuchar
|
||||
Roman Kudryashov
|
||||
Rotem Adhoh
|
||||
Sagar Rao
|
||||
Sahan Dilshan
|
||||
|
@ -5,10 +5,8 @@
|
||||
*/
|
||||
package io.debezium.connector.mongodb.converters;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.mongodb.Module;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.CloudEventsMaker;
|
||||
import io.debezium.converters.spi.CloudEventsProvider;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
@ -26,8 +24,8 @@ public String getName() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordParser createParser(Schema schema, Struct record) {
|
||||
return new MongoDbRecordParser(schema, record);
|
||||
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
|
||||
return new MongoDbRecordParser(recordAndMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,11 +7,10 @@
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
|
||||
import io.debezium.connector.mongodb.MongoDbFieldName;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.util.Collect;
|
||||
@ -34,8 +33,8 @@ public class MongoDbRecordParser extends RecordParser {
|
||||
COLLECTION,
|
||||
WALL_TIME);
|
||||
|
||||
public MongoDbRecordParser(Schema schema, Struct record) {
|
||||
super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER, MongoDbFieldName.UPDATE_DESCRIPTION);
|
||||
public MongoDbRecordParser(RecordAndMetadata recordAndMetadata) {
|
||||
super(recordAndMetadata, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER, MongoDbFieldName.UPDATE_DESCRIPTION);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,15 +7,26 @@
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.apache.kafka.connect.transforms.HeaderFrom;
|
||||
import org.bson.Document;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mongodb.MongoDbConnectorConfig.SnapshotMode;
|
||||
import io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter;
|
||||
import io.debezium.converters.CloudEventsConverterTest;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
/**
|
||||
@ -25,34 +36,34 @@
|
||||
*/
|
||||
public class CloudEventsConverterIT extends AbstractMongoConnectorIT {
|
||||
|
||||
@Test
|
||||
public void testCorrectFormat() throws Exception {
|
||||
protected static final String SERVER_NAME = "mongo1";
|
||||
protected static final String DB_NAME = "dbA";
|
||||
protected static final String COLLECTION_NAME = "c1";
|
||||
|
||||
@Before
|
||||
public void beforeEach() {
|
||||
Testing.Print.enable();
|
||||
config = TestHelper.getConfiguration(mongo)
|
||||
.edit()
|
||||
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.build();
|
||||
|
||||
config = getConfiguration();
|
||||
context = new MongoDbTaskContext(config);
|
||||
|
||||
TestHelper.cleanDatabase(mongo, "dbA");
|
||||
|
||||
TestHelper.cleanDatabase(mongo, DB_NAME);
|
||||
start(MongoDbConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCorrectFormat() throws Exception {
|
||||
// Wait for snapshot completion
|
||||
waitForSnapshotToBeCompleted("mongodb", "mongo1");
|
||||
|
||||
List<Document> documentsToInsert = loadTestDocuments("restaurants1.json");
|
||||
insertDocuments("dbA", "c1", documentsToInsert.toArray(new Document[0]));
|
||||
insertDocuments(DB_NAME, COLLECTION_NAME, documentsToInsert.toArray(new Document[0]));
|
||||
Document updateObj = new Document()
|
||||
.append("$set", new Document()
|
||||
.append("name", "Closed"));
|
||||
updateDocument("dbA", "c1", Document.parse("{\"restaurant_id\": \"30075445\"}"), updateObj);
|
||||
updateDocument(DB_NAME, COLLECTION_NAME, Document.parse("{\"restaurant_id\": \"30075445\"}"), updateObj);
|
||||
// Pause is necessary to make sure that fullDocument fro change streams is caputred before delete
|
||||
Thread.sleep(1000);
|
||||
deleteDocuments("dbA", "c1", Document.parse("{\"restaurant_id\": \"30075445\"}"));
|
||||
deleteDocuments(DB_NAME, COLLECTION_NAME, Document.parse("{\"restaurant_id\": \"30075445\"}"));
|
||||
|
||||
// 6 INSERTs + 1 UPDATE + 1 DELETE
|
||||
final int recCount = 8;
|
||||
@ -78,7 +89,62 @@ public void testCorrectFormat() throws Exception {
|
||||
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, MongoDbFieldName.UPDATE_DESCRIPTION, false);
|
||||
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, Envelope.FieldName.AFTER, false);
|
||||
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(updateRecord, "mongodb", "mongo1", false);
|
||||
}
|
||||
|
||||
stopConnector();
|
||||
@Test
|
||||
@FixFor({ "DBZ-3642" })
|
||||
public void shouldConvertToCloudEventsInJsonWithMetadataInHeadersAfterOutboxEventRouter() throws Exception {
|
||||
HeaderFrom<SourceRecord> headerFrom = new HeaderFrom.Value<>();
|
||||
Map<String, String> headerFromConfig = new LinkedHashMap<>();
|
||||
headerFromConfig.put("fields", "source,op,transaction");
|
||||
headerFromConfig.put("headers", "source,op,transaction");
|
||||
headerFromConfig.put("operation", "copy");
|
||||
headerFromConfig.put("header.converter.schemas.enable", "true");
|
||||
headerFrom.configure(headerFromConfig);
|
||||
|
||||
MongoEventRouter<SourceRecord> outboxEventRouter = new MongoEventRouter<>();
|
||||
Map<String, String> outboxEventRouterConfig = new LinkedHashMap<>();
|
||||
outboxEventRouterConfig.put("collection.expand.json.payload", "true");
|
||||
outboxEventRouter.configure(outboxEventRouterConfig);
|
||||
|
||||
try (var client = connect()) {
|
||||
client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME)
|
||||
.insertOne(new Document()
|
||||
.append("aggregateid", "10711fa5")
|
||||
.append("aggregatetype", "User")
|
||||
.append("type", "UserCreated")
|
||||
.append("payload", new Document()
|
||||
.append("_id", new ObjectId("000000000000000000000000"))
|
||||
.append("someField1", "some value 1")
|
||||
.append("someField2", 7005L)));
|
||||
}
|
||||
|
||||
SourceRecords streamingRecords = consumeRecordsByTopic(1);
|
||||
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
|
||||
|
||||
SourceRecord record = streamingRecords.recordsForTopic("mongo1.dbA.c1").get(0);
|
||||
SourceRecord recordWithMetadataHeaders = headerFrom.apply(record);
|
||||
SourceRecord routedEvent = outboxEventRouter.apply(recordWithMetadataHeaders);
|
||||
|
||||
assertThat(routedEvent).isNotNull();
|
||||
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
|
||||
assertThat(routedEvent.keySchema().type()).isEqualTo(Schema.Type.STRING);
|
||||
assertThat(routedEvent.key()).isEqualTo("10711fa5");
|
||||
assertThat(routedEvent.value()).isInstanceOf(Struct.class);
|
||||
|
||||
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataInHeaders(routedEvent, "mongodb", "mongo1");
|
||||
|
||||
headerFrom.close();
|
||||
outboxEventRouter.close();
|
||||
}
|
||||
|
||||
private Configuration getConfiguration() {
|
||||
return TestHelper.getConfiguration(mongo)
|
||||
.edit()
|
||||
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
|
||||
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, DB_NAME + "." + COLLECTION_NAME)
|
||||
.with(CommonConnectorConfig.TOPIC_PREFIX, SERVER_NAME)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
@ -5,10 +5,8 @@
|
||||
*/
|
||||
package io.debezium.connector.mysql.converters;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.mysql.Module;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.CloudEventsMaker;
|
||||
import io.debezium.converters.spi.CloudEventsProvider;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
@ -26,8 +24,8 @@ public String getName() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordParser createParser(Schema schema, Struct record) {
|
||||
return new MySqlRecordParser(schema, record);
|
||||
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
|
||||
return new MySqlRecordParser(recordAndMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,10 +7,9 @@
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.util.Collect;
|
||||
@ -41,8 +40,8 @@ public class MySqlRecordParser extends RecordParser {
|
||||
THREAD_KEY,
|
||||
QUERY_KEY);
|
||||
|
||||
public MySqlRecordParser(Schema schema, Struct record) {
|
||||
super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
public MySqlRecordParser(RecordAndMetadata recordAndMetadata) {
|
||||
super(recordAndMetadata, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,154 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
|
||||
import io.debezium.converters.AbstractCloudEventsConverterTest;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
/**
|
||||
* Integration test for {@link io.debezium.converters.CloudEventsConverter} with {@link MySqlConnector}
|
||||
*
|
||||
* @author Roman Kudryashov
|
||||
*/
|
||||
public class CloudEventsConverterIT extends AbstractCloudEventsConverterTest<MySqlConnector> {
|
||||
|
||||
private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath();
|
||||
|
||||
private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "empty")
|
||||
.withDbHistoryPath(SCHEMA_HISTORY_PATH);
|
||||
|
||||
private JdbcConnection connection;
|
||||
|
||||
private static final String SETUP_OUTBOX_TABLE = "CREATE TABLE outbox " +
|
||||
"(" +
|
||||
" id varchar(36) not null," +
|
||||
" aggregatetype varchar(255) not null," +
|
||||
" aggregateid varchar(255) not null," +
|
||||
" type varchar(255) not null," +
|
||||
" payload json," +
|
||||
" CONSTRAINT outbox_pk PRIMARY KEY (id));";
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void beforeEach() throws Exception {
|
||||
stopConnector();
|
||||
DATABASE.createAndInitialize();
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(SCHEMA_HISTORY_PATH);
|
||||
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) {
|
||||
this.connection = db.connect();
|
||||
}
|
||||
super.beforeEach();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterEach() throws Exception {
|
||||
try {
|
||||
stopConnector();
|
||||
}
|
||||
finally {
|
||||
Testing.Files.delete(SCHEMA_HISTORY_PATH);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<MySqlConnector> getConnectorClass() {
|
||||
return MySqlConnector.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getConnectorName() {
|
||||
return "mysql";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getServerName() {
|
||||
return DATABASE.getServerName();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JdbcConnection databaseConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration.Builder getConfigurationBuilder() {
|
||||
return DATABASE.defaultConfig()
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String tableName() {
|
||||
return tableNameId().toQuotedString('`');
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String topicName() {
|
||||
return DATABASE.topicForTable("outbox");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createTable() throws Exception {
|
||||
this.connection.execute(SETUP_OUTBOX_TABLE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createInsert(String eventId,
|
||||
String eventType,
|
||||
String aggregateType,
|
||||
String aggregateId,
|
||||
String payloadJson,
|
||||
String additional) {
|
||||
StringBuilder insert = new StringBuilder();
|
||||
insert.append("INSERT INTO outbox VALUES (");
|
||||
insert.append("'").append(UUID.fromString(eventId)).append("'");
|
||||
insert.append(", '").append(aggregateType).append("'");
|
||||
insert.append(", '").append(aggregateId).append("'");
|
||||
insert.append(", '").append(eventType).append("'");
|
||||
|
||||
if (payloadJson == null) {
|
||||
insert.append(", null");
|
||||
}
|
||||
else if (payloadJson.isEmpty()) {
|
||||
insert.append(", ''");
|
||||
}
|
||||
else {
|
||||
insert.append(", '").append(payloadJson).append("'");
|
||||
}
|
||||
|
||||
if (additional != null) {
|
||||
insert.append(additional);
|
||||
}
|
||||
insert.append(")");
|
||||
|
||||
return insert.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void waitForStreamingStarted() throws InterruptedException {
|
||||
waitForStreamingRunning("mysql", DATABASE.getServerName());
|
||||
}
|
||||
|
||||
private TableId tableNameId() {
|
||||
return tableNameId("a");
|
||||
}
|
||||
|
||||
private TableId tableNameId(String table) {
|
||||
return TableId.parse(DATABASE.qualifiedTableName(table));
|
||||
}
|
||||
}
|
@ -5,10 +5,8 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.converters;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.oracle.Module;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.CloudEventsMaker;
|
||||
import io.debezium.converters.spi.CloudEventsProvider;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
@ -26,8 +24,8 @@ public String getName() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordParser createParser(Schema schema, Struct record) {
|
||||
return new OracleRecordParser(schema, record);
|
||||
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
|
||||
return new OracleRecordParser(recordAndMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,10 +7,9 @@
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.util.Collect;
|
||||
@ -29,8 +28,8 @@ public class OracleRecordParser extends RecordParser {
|
||||
COMMIT_SCN_KEY,
|
||||
LCR_POSITION_KEY);
|
||||
|
||||
public OracleRecordParser(Schema schema, Struct record) {
|
||||
super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
public OracleRecordParser(RecordAndMetadata recordAndMetadata) {
|
||||
super(recordAndMetadata, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -5,10 +5,8 @@
|
||||
*/
|
||||
package io.debezium.connector.postgresql.converters;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.postgresql.Module;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.CloudEventsMaker;
|
||||
import io.debezium.converters.spi.CloudEventsProvider;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
@ -26,8 +24,8 @@ public String getName() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordParser createParser(Schema schema, Struct record) {
|
||||
return new PostgresRecordParser(schema, record);
|
||||
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
|
||||
return new PostgresRecordParser(recordAndMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,10 +7,9 @@
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.util.Collect;
|
||||
@ -33,8 +32,8 @@ public class PostgresRecordParser extends RecordParser {
|
||||
LSN_KEY,
|
||||
SEQUENCE_KEY);
|
||||
|
||||
public PostgresRecordParser(Schema schema, Struct record) {
|
||||
super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
public PostgresRecordParser(RecordAndMetadata recordAndMetadata) {
|
||||
super(recordAndMetadata, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.connector.postgresql;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.Before;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
|
||||
import io.debezium.converters.AbstractCloudEventsConverterTest;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
|
||||
/**
|
||||
* Integration test for {@link io.debezium.converters.CloudEventsConverter} with {@link PostgresConnector}
|
||||
*
|
||||
* @author Roman Kudryashov
|
||||
*/
|
||||
public class CloudEventsConverterIT extends AbstractCloudEventsConverterTest<PostgresConnector> {
|
||||
|
||||
private static final String SETUP_OUTBOX_SCHEMA = "DROP SCHEMA IF EXISTS outboxsmtit CASCADE;" +
|
||||
"CREATE SCHEMA outboxsmtit;";
|
||||
|
||||
private static final String SETUP_OUTBOX_TABLE = "CREATE TABLE outboxsmtit.outbox " +
|
||||
"(" +
|
||||
" id uuid not null" +
|
||||
" constraint outbox_pk primary key," +
|
||||
" aggregatetype varchar(255) not null," +
|
||||
" aggregateid varchar(255) not null," +
|
||||
" type varchar(255) not null," +
|
||||
" payload jsonb" +
|
||||
");";
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void beforeEach() throws Exception {
|
||||
TestHelper.dropDefaultReplicationSlot();
|
||||
TestHelper.dropPublication();
|
||||
super.beforeEach();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<PostgresConnector> getConnectorClass() {
|
||||
return PostgresConnector.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getConnectorName() {
|
||||
return "postgresql";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getServerName() {
|
||||
return TestHelper.TEST_SERVER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JdbcConnection databaseConnection() {
|
||||
return TestHelper.create();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration.Builder getConfigurationBuilder() {
|
||||
return TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
|
||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
||||
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "outboxsmtit")
|
||||
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "outboxsmtit\\.outbox");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String tableName() {
|
||||
return "outboxsmtit.outbox";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String topicName() {
|
||||
return TestHelper.topicName("outboxsmtit.outbox");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createTable() throws Exception {
|
||||
TestHelper.execute(SETUP_OUTBOX_SCHEMA);
|
||||
TestHelper.execute(SETUP_OUTBOX_TABLE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createInsert(String eventId,
|
||||
String eventType,
|
||||
String aggregateType,
|
||||
String aggregateId,
|
||||
String payloadJson,
|
||||
String additional) {
|
||||
StringBuilder insert = new StringBuilder();
|
||||
insert.append("INSERT INTO outboxsmtit.outbox VALUES (");
|
||||
insert.append("'").append(UUID.fromString(eventId)).append("'");
|
||||
insert.append(", '").append(aggregateType).append("'");
|
||||
insert.append(", '").append(aggregateId).append("'");
|
||||
insert.append(", '").append(eventType).append("'");
|
||||
|
||||
if (payloadJson == null) {
|
||||
insert.append(", null::jsonb");
|
||||
}
|
||||
else if (payloadJson.isEmpty()) {
|
||||
insert.append(", ''");
|
||||
}
|
||||
else {
|
||||
insert.append(", '").append(payloadJson).append("'::jsonb");
|
||||
}
|
||||
|
||||
if (additional != null) {
|
||||
insert.append(additional);
|
||||
}
|
||||
insert.append(")");
|
||||
|
||||
return insert.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void waitForStreamingStarted() throws InterruptedException {
|
||||
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
|
||||
}
|
||||
}
|
@ -5,10 +5,8 @@
|
||||
*/
|
||||
package io.debezium.connector.sqlserver.converters;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.sqlserver.Module;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.CloudEventsMaker;
|
||||
import io.debezium.converters.spi.CloudEventsProvider;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
@ -26,8 +24,8 @@ public String getName() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordParser createParser(Schema schema, Struct record) {
|
||||
return new SqlServerRecordParser(schema, record);
|
||||
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
|
||||
return new SqlServerRecordParser(recordAndMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -7,10 +7,9 @@
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.util.Collect;
|
||||
@ -31,8 +30,8 @@ public class SqlServerRecordParser extends RecordParser {
|
||||
COMMIT_LSN_KEY,
|
||||
EVENT_SERIAL_NO_KEY);
|
||||
|
||||
public SqlServerRecordParser(Schema schema, Struct record) {
|
||||
super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
public SqlServerRecordParser(RecordAndMetadata recordAndMetadata) {
|
||||
super(recordAndMetadata, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,8 @@
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.kafka.common.errors.SerializationException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Schema.Type;
|
||||
@ -46,6 +48,10 @@
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Instantiator;
|
||||
import io.debezium.connector.AbstractSourceInfo;
|
||||
import io.debezium.converters.CloudEventsConverterConfig.MetadataLocation;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadataBaseImpl;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadataHeaderImpl;
|
||||
import io.debezium.converters.spi.CloudEventsMaker;
|
||||
import io.debezium.converters.spi.CloudEventsProvider;
|
||||
import io.debezium.converters.spi.RecordParser;
|
||||
@ -135,6 +141,8 @@ public class CloudEventsConverter implements Converter {
|
||||
private final JsonConverter jsonCloudEventsConverter = new JsonConverter();
|
||||
private JsonConverterConfig jsonCloudEventsConverterConfig = null;
|
||||
|
||||
private JsonConverter jsonHeaderConverter = new JsonConverter();
|
||||
|
||||
private final JsonConverter jsonDataConverter = new JsonConverter();
|
||||
|
||||
private boolean enableJsonSchemas;
|
||||
@ -144,6 +152,8 @@ public class CloudEventsConverter implements Converter {
|
||||
private List<String> schemaRegistryUrls;
|
||||
private SchemaNameAdjuster schemaNameAdjuster;
|
||||
|
||||
private MetadataLocation metadataLocation;
|
||||
|
||||
public CloudEventsConverter() {
|
||||
this(null);
|
||||
}
|
||||
@ -163,6 +173,12 @@ public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
ceSerializerType = ceConfig.cloudeventsSerializerType();
|
||||
dataSerializerType = ceConfig.cloudeventsDataSerializerTypeConfig();
|
||||
schemaNameAdjuster = ceConfig.schemaNameAdjustmentMode().createAdjuster();
|
||||
metadataLocation = ceConfig.metadataLocation();
|
||||
|
||||
Map<String, Object> jsonHeaderConverterConfig = new HashMap<>();
|
||||
jsonHeaderConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true);
|
||||
jsonHeaderConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "header");
|
||||
jsonHeaderConverter.configure(jsonHeaderConverterConfig);
|
||||
|
||||
boolean usingAvro = false;
|
||||
|
||||
@ -220,21 +236,45 @@ protected Map<String, String> configureConverterType(boolean isKey, Map<String,
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(String topic, Schema schema, Object value) {
|
||||
return this.fromConnectData(topic, null, schema, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) {
|
||||
if (schema == null || value == null) {
|
||||
return null;
|
||||
}
|
||||
if (!Envelope.isEnvelopeSchema(schema)) {
|
||||
// TODO Handling of non-data messages like schema change or transaction metadata
|
||||
return null;
|
||||
|
||||
if (this.metadataLocation == MetadataLocation.VALUE) {
|
||||
if (!Envelope.isEnvelopeSchema(schema)) {
|
||||
// TODO Handling of non-data messages like schema change or transaction metadata
|
||||
return null;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (headers.lastHeader(Envelope.FieldName.SOURCE) == null || headers.lastHeader(Envelope.FieldName.OPERATION) == null) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
if (schema.type() != STRUCT) {
|
||||
throw new DataException("Mismatching schema");
|
||||
}
|
||||
|
||||
Struct record = requireStruct(value, "CloudEvents converter");
|
||||
CloudEventsProvider provider = lookupCloudEventsProvider(record);
|
||||
Struct source = getSource(record, headers);
|
||||
|
||||
CloudEventsProvider provider = lookupCloudEventsProvider(source);
|
||||
|
||||
RecordAndMetadata recordAndMetadata;
|
||||
if (this.metadataLocation == MetadataLocation.VALUE) {
|
||||
recordAndMetadata = new RecordAndMetadataBaseImpl(record, schema);
|
||||
}
|
||||
else {
|
||||
recordAndMetadata = new RecordAndMetadataHeaderImpl(record, schema, headers, jsonHeaderConverter);
|
||||
}
|
||||
|
||||
RecordParser parser = provider.createParser(recordAndMetadata);
|
||||
|
||||
RecordParser parser = provider.createParser(schema, record);
|
||||
CloudEventsMaker maker = provider.createMaker(parser, dataSerializerType,
|
||||
(schemaRegistryUrls == null) ? null : String.join(",", schemaRegistryUrls));
|
||||
|
||||
@ -280,8 +320,8 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) {
|
||||
/**
|
||||
* Lookup the CloudEventsProvider implementation for the source connector.
|
||||
*/
|
||||
private static CloudEventsProvider lookupCloudEventsProvider(Struct record) {
|
||||
String connectorType = record.getStruct(Envelope.FieldName.SOURCE).getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
|
||||
private static CloudEventsProvider lookupCloudEventsProvider(Struct source) {
|
||||
String connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
|
||||
CloudEventsProvider provider = providers.get(connectorType);
|
||||
if (provider != null) {
|
||||
return provider;
|
||||
@ -289,6 +329,17 @@ private static CloudEventsProvider lookupCloudEventsProvider(Struct record) {
|
||||
throw new DataException("No usable CloudEvents converters for connector type \"" + connectorType + "\"");
|
||||
}
|
||||
|
||||
private Struct getSource(Struct record, Headers headers) {
|
||||
if (this.metadataLocation == MetadataLocation.VALUE) {
|
||||
return record.getStruct(Envelope.FieldName.SOURCE);
|
||||
}
|
||||
else {
|
||||
Header header = headers.lastHeader(Envelope.FieldName.SOURCE);
|
||||
SchemaAndValue sav = jsonHeaderConverter.toConnectData(null, header.value());
|
||||
return (Struct) sav.value();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a CloudEvents wrapper, converting the "data" to Avro.
|
||||
*/
|
||||
|
@ -11,6 +11,7 @@
|
||||
import org.apache.kafka.connect.storage.ConverterConfig;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode;
|
||||
import io.debezium.config.EnumeratedValue;
|
||||
import io.debezium.converters.spi.SerializerType;
|
||||
|
||||
/**
|
||||
@ -32,6 +33,10 @@ public class CloudEventsConverterConfig extends ConverterConfig {
|
||||
+ "'avro' replaces the characters that cannot be used in the Avro type name with underscore (default)"
|
||||
+ "'none' does not apply any adjustment";
|
||||
|
||||
public static final String CLOUDEVENTS_METADATA_LOCATION_CONFIG = "metadata.location";
|
||||
public static final String CLOUDEVENTS_METADATA_LOCATION_DEFAULT = "value";
|
||||
private static final String CLOUDEVENTS_METADATA_LOCATION_DOC = "Specify from where to retrieve metadata";
|
||||
|
||||
private static final ConfigDef CONFIG;
|
||||
|
||||
static {
|
||||
@ -43,6 +48,8 @@ public class CloudEventsConverterConfig extends ConverterConfig {
|
||||
CLOUDEVENTS_DATA_SERIALIZER_TYPE_DOC);
|
||||
CONFIG.define(CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_CONFIG, ConfigDef.Type.STRING, CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_DEFAULT, ConfigDef.Importance.LOW,
|
||||
CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_DOC);
|
||||
CONFIG.define(CLOUDEVENTS_METADATA_LOCATION_CONFIG, ConfigDef.Type.STRING, CLOUDEVENTS_METADATA_LOCATION_DEFAULT, ConfigDef.Importance.HIGH,
|
||||
CLOUDEVENTS_METADATA_LOCATION_DOC);
|
||||
}
|
||||
|
||||
public static ConfigDef configDef() {
|
||||
@ -79,4 +86,59 @@ public SerializerType cloudeventsDataSerializerTypeConfig() {
|
||||
public SchemaNameAdjustmentMode schemaNameAdjustmentMode() {
|
||||
return SchemaNameAdjustmentMode.parse(getString(CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_CONFIG));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return from where to retrieve metadata
|
||||
*
|
||||
* @return metadata location
|
||||
*/
|
||||
public MetadataLocation metadataLocation() {
|
||||
return MetadataLocation.parse(getString(CLOUDEVENTS_METADATA_LOCATION_CONFIG));
|
||||
}
|
||||
|
||||
/**
|
||||
* The set of predefined MetadataLocation options
|
||||
*/
|
||||
public enum MetadataLocation implements EnumeratedValue {
|
||||
|
||||
/**
|
||||
* Get metadata from the value
|
||||
*/
|
||||
VALUE("value"),
|
||||
|
||||
/**
|
||||
* Get metadata from the header
|
||||
*/
|
||||
HEADER("header");
|
||||
|
||||
private final String value;
|
||||
|
||||
MetadataLocation(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the supplied values is one of the predefined options
|
||||
*
|
||||
* @param value the configuration property value ; may not be null
|
||||
* @return the matching option, or null if the match is not found
|
||||
*/
|
||||
public static MetadataLocation parse(String value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
value = value.trim();
|
||||
for (MetadataLocation option : MetadataLocation.values()) {
|
||||
if (option.getValue().equalsIgnoreCase(value)) {
|
||||
return option;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.converters.recordandmetadata;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
/**
|
||||
* Common interface for a structure containing a record and its metadata
|
||||
*
|
||||
* @author Roman Kudryashov
|
||||
*/
|
||||
public interface RecordAndMetadata {
|
||||
|
||||
Struct record();
|
||||
|
||||
Schema dataSchema(String... dataFields);
|
||||
|
||||
Struct source();
|
||||
|
||||
String operation();
|
||||
|
||||
Struct transaction();
|
||||
|
||||
SchemaAndValue ts_ms();
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.converters.recordandmetadata;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.AbstractSourceInfo;
|
||||
import io.debezium.data.Envelope;
|
||||
|
||||
public class RecordAndMetadataBaseImpl implements RecordAndMetadata {
|
||||
|
||||
private final Struct record;
|
||||
|
||||
private final Schema dataSchema;
|
||||
|
||||
public RecordAndMetadataBaseImpl(Struct record, Schema dataSchema) {
|
||||
this.record = record;
|
||||
this.dataSchema = dataSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct record() {
|
||||
return record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema dataSchema(String... dataFields) {
|
||||
Struct source = record.getStruct(Envelope.FieldName.SOURCE);
|
||||
String connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
|
||||
return getDataSchema(this.dataSchema, connectorType, dataFields);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct source() {
|
||||
return record.getStruct(Envelope.FieldName.SOURCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String operation() {
|
||||
return record.getString(Envelope.FieldName.OPERATION);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct transaction() {
|
||||
return record.schema().field(Envelope.FieldName.TRANSACTION) != null ? record.getStruct(Envelope.FieldName.TRANSACTION) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue ts_ms() {
|
||||
String ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString();
|
||||
Schema ts_msSchema = dataSchema.field(Envelope.FieldName.TIMESTAMP).schema();
|
||||
return new SchemaAndValue(ts_msSchema, ts_ms);
|
||||
}
|
||||
|
||||
private static Schema getDataSchema(Schema schema, String connectorType, String... fields) {
|
||||
String dataSchemaName = "io.debezium.connector." + connectorType + ".Data";
|
||||
SchemaBuilder builder = SchemaBuilder.struct().name(dataSchemaName);
|
||||
|
||||
for (String field : fields) {
|
||||
builder.field(field, schema.field(field).schema());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.converters.recordandmetadata;
|
||||
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.json.JsonConverter;
|
||||
|
||||
import io.debezium.data.Envelope;
|
||||
|
||||
public class RecordAndMetadataHeaderImpl implements RecordAndMetadata {
|
||||
|
||||
private final Struct record;
|
||||
private final Schema dataSchema;
|
||||
private final Struct source;
|
||||
private final String operation;
|
||||
private final Struct transaction;
|
||||
private final SchemaAndValue ts_ms;
|
||||
|
||||
public RecordAndMetadataHeaderImpl(Struct record, Schema dataSchema, Headers headers, JsonConverter jsonHeaderConverter) {
|
||||
this.record = record;
|
||||
this.dataSchema = dataSchema;
|
||||
this.source = (Struct) getHeaderSchemaAndValue(headers, Envelope.FieldName.SOURCE, jsonHeaderConverter).value();
|
||||
this.operation = (String) getHeaderSchemaAndValue(headers, Envelope.FieldName.OPERATION, jsonHeaderConverter).value();
|
||||
this.transaction = (Struct) getHeaderSchemaAndValue(headers, Envelope.FieldName.TRANSACTION, jsonHeaderConverter).value();
|
||||
String ts_ms = source.getInt64(Envelope.FieldName.TIMESTAMP).toString();
|
||||
Schema ts_msSchema = source.schema().field(Envelope.FieldName.TIMESTAMP).schema();
|
||||
this.ts_ms = new SchemaAndValue(ts_msSchema, ts_ms);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct record() {
|
||||
return this.record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema dataSchema(String... dataFields) {
|
||||
return this.dataSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct source() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String operation() {
|
||||
return this.operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Struct transaction() {
|
||||
return this.transaction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue ts_ms() {
|
||||
return this.ts_ms;
|
||||
}
|
||||
|
||||
private static SchemaAndValue getHeaderSchemaAndValue(Headers headers, String headerName, JsonConverter jsonHeaderConverter) {
|
||||
Header header = headers.lastHeader(headerName);
|
||||
return jsonHeaderConverter.toConnectData(null, header.value());
|
||||
}
|
||||
}
|
@ -5,8 +5,7 @@
|
||||
*/
|
||||
package io.debezium.converters.spi;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
|
||||
/**
|
||||
* A {@link java.util.ServiceLoader} interface that connectors should implement if they wish to provide
|
||||
@ -25,11 +24,10 @@ public interface CloudEventsProvider {
|
||||
/**
|
||||
* Create a concrete parser of a change record for the connector.
|
||||
*
|
||||
* @param schema the schema of the record
|
||||
* @param record the value of the record
|
||||
* @param recordAndMetadata record and its metadata
|
||||
* @return a concrete parser
|
||||
*/
|
||||
RecordParser createParser(Schema schema, Struct record);
|
||||
RecordParser createParser(RecordAndMetadata recordAndMetadata);
|
||||
|
||||
/**
|
||||
* Create a concrete CloudEvents maker using the outputs of a record parser. Also need to specify the data content
|
||||
|
@ -13,7 +13,7 @@
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.AbstractSourceInfo;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
|
||||
import io.debezium.util.Collect;
|
||||
|
||||
/**
|
||||
@ -39,20 +39,21 @@ public abstract class RecordParser {
|
||||
AbstractSourceInfo.SNAPSHOT_KEY,
|
||||
AbstractSourceInfo.DATABASE_NAME_KEY);
|
||||
|
||||
protected RecordParser(Schema schema, Struct record, String... dataFields) {
|
||||
this.record = record;
|
||||
this.source = record.getStruct(Envelope.FieldName.SOURCE);
|
||||
this.transaction = record.schema().field(Envelope.FieldName.TRANSACTION) != null ? record.getStruct(Envelope.FieldName.TRANSACTION) : null;
|
||||
this.op = record.getString(Envelope.FieldName.OPERATION);
|
||||
this.opSchema = schema.field(Envelope.FieldName.OPERATION).schema();
|
||||
this.ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString();
|
||||
this.ts_msSchema = schema.field(Envelope.FieldName.TIMESTAMP).schema();
|
||||
protected RecordParser(RecordAndMetadata recordAndMetadata, String... dataFields) {
|
||||
this.record = recordAndMetadata.record();
|
||||
this.source = recordAndMetadata.source();
|
||||
this.transaction = recordAndMetadata.transaction();
|
||||
this.op = recordAndMetadata.operation();
|
||||
this.opSchema = Schema.STRING_SCHEMA;
|
||||
this.ts_ms = (String) recordAndMetadata.ts_ms().value();
|
||||
this.ts_msSchema = recordAndMetadata.ts_ms().schema();
|
||||
this.connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
|
||||
this.dataSchema = getDataSchema(schema, connectorType, dataFields);
|
||||
this.dataSchema = recordAndMetadata.dataSchema(dataFields);
|
||||
}
|
||||
|
||||
private static Schema getDataSchema(Schema schema, String connectorType, String... fields) {
|
||||
SchemaBuilder builder = SchemaBuilder.struct().name("io.debezium.connector.mysql.Data");
|
||||
String dataSchemaName = "io.debezium.connector." + connectorType + ".Data";
|
||||
SchemaBuilder builder = SchemaBuilder.struct().name(dataSchemaName);
|
||||
|
||||
for (String field : fields) {
|
||||
builder.field(field, schema.field(field).schema());
|
||||
|
@ -8,17 +8,24 @@
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.header.Header;
|
||||
import org.apache.kafka.connect.header.Headers;
|
||||
import org.apache.kafka.connect.json.JsonConverter;
|
||||
import org.apache.kafka.connect.json.JsonConverterConfig;
|
||||
import org.apache.kafka.connect.json.JsonDeserializer;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.storage.HeaderConverter;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@ -317,6 +324,90 @@ public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String
|
||||
}
|
||||
}
|
||||
|
||||
public static void shouldConvertToCloudEventsInJsonWithMetadataInHeaders(SourceRecord record, String connectorName, String serverName) throws Exception {
|
||||
Map<String, Object> config = new HashMap<>();
|
||||
config.put("serializer.type", "json");
|
||||
config.put("data.serializer.type", "json");
|
||||
config.put("metadata.location", "header");
|
||||
|
||||
CloudEventsConverter cloudEventsConverter = new CloudEventsConverter();
|
||||
cloudEventsConverter.configure(config, false);
|
||||
|
||||
JsonNode valueJson = null;
|
||||
JsonNode dataJson;
|
||||
String msg = null;
|
||||
|
||||
try {
|
||||
// Convert the value and inspect it ...
|
||||
msg = "converting value using CloudEvents JSON converter";
|
||||
byte[] valueBytes = cloudEventsConverter.fromConnectData(record.topic(), convertHeadersFor(record), record.valueSchema(), record.value());
|
||||
msg = "deserializing value using CE deserializer";
|
||||
final SchemaAndValue ceValue = cloudEventsConverter.toConnectData(record.topic(), valueBytes);
|
||||
msg = "deserializing value using JSON deserializer";
|
||||
|
||||
try (JsonDeserializer jsonDeserializer = new JsonDeserializer()) {
|
||||
jsonDeserializer.configure(Collections.emptyMap(), false);
|
||||
valueJson = jsonDeserializer.deserialize(record.topic(), valueBytes);
|
||||
}
|
||||
|
||||
msg = "inspecting all required CloudEvents fields in the value";
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.ID)).isNotNull();
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.SOURCE).asText()).isEqualTo("/debezium/" + connectorName + "/" + serverName);
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.SPECVERSION).asText()).isEqualTo("1.0");
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.DATASCHEMA)).isNull();
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.TYPE).asText()).isEqualTo("io.debezium." + connectorName + ".datachangeevent");
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.DATACONTENTTYPE).asText()).isEqualTo("application/json");
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.TIME)).isNotNull();
|
||||
assertThat(valueJson.get(CloudEventsMaker.FieldName.DATA)).isNotNull();
|
||||
msg = "inspecting required CloudEvents extension attributes for Debezium";
|
||||
assertThat(valueJson.get("iodebeziumop")).isNotNull();
|
||||
assertThat(valueJson.get("iodebeziumtsms")).isNotNull();
|
||||
msg = "inspecting transaction metadata attributes";
|
||||
assertThat(valueJson.get("iodebeziumtxid")).isNotNull();
|
||||
assertThat(valueJson.get("iodebeziumtxtotalorder")).isNotNull();
|
||||
assertThat(valueJson.get("iodebeziumtxdatacollectionorder")).isNotNull();
|
||||
msg = "inspecting the data field in the value";
|
||||
dataJson = valueJson.get(CloudEventsMaker.FieldName.DATA);
|
||||
assertThat(dataJson.get(CloudEventsMaker.FieldName.SCHEMA_FIELD_NAME)).isNotNull();
|
||||
assertThat(dataJson.get(CloudEventsMaker.FieldName.PAYLOAD_FIELD_NAME)).isNotNull();
|
||||
assertThat(dataJson.get(CloudEventsMaker.FieldName.PAYLOAD_FIELD_NAME).get("someField1").textValue()).isEqualTo("some value 1");
|
||||
assertThat(dataJson.get(CloudEventsMaker.FieldName.PAYLOAD_FIELD_NAME).get("someField2").intValue()).isEqualTo(7005);
|
||||
}
|
||||
catch (Throwable t) {
|
||||
Testing.Print.enable();
|
||||
Testing.print("Problem with message on topic '" + record.topic() + "':");
|
||||
Testing.printError(t);
|
||||
Testing.print("error " + msg);
|
||||
Testing.print(" value: " + SchemaUtil.asString(record.value()));
|
||||
Testing.print(" value deserialized from CloudEvents in JSON: " + prettyJson(valueJson));
|
||||
if (t instanceof AssertionError) {
|
||||
throw t;
|
||||
}
|
||||
fail("error " + msg + ": " + t.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private static RecordHeaders convertHeadersFor(SourceRecord record) throws IOException {
|
||||
try (HeaderConverter jsonHeaderConverter = new JsonConverter()) {
|
||||
Map<String, Object> jsonHeaderConverterConfig = new HashMap<>();
|
||||
jsonHeaderConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true);
|
||||
jsonHeaderConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "header");
|
||||
jsonHeaderConverter.configure(jsonHeaderConverterConfig);
|
||||
|
||||
Headers headers = record.headers();
|
||||
RecordHeaders result = new RecordHeaders();
|
||||
if (headers != null) {
|
||||
String topic = record.topic();
|
||||
for (Header header : headers) {
|
||||
String key = header.key();
|
||||
byte[] rawHeader = jsonHeaderConverter.fromConnectHeader(topic, key, header.schema(), header.value());
|
||||
result.add(key, rawHeader);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
private static String prettyJson(JsonNode json) {
|
||||
try {
|
||||
return new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json);
|
||||
|
@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.converters;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceConnector;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.apache.kafka.connect.transforms.HeaderFrom;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.transforms.outbox.EventRouter;
|
||||
|
||||
/**
|
||||
* A unified test of all {@link CloudEventsConverter} behavior which all connectors should extend.
|
||||
*
|
||||
* @author Roman Kudryashov
|
||||
*/
|
||||
public abstract class AbstractCloudEventsConverterTest<T extends SourceConnector> extends AbstractConnectorTest {
|
||||
|
||||
protected EventRouter<SourceRecord> outboxEventRouter;
|
||||
|
||||
protected HeaderFrom<SourceRecord> headerFrom;
|
||||
|
||||
protected abstract Class<T> getConnectorClass();
|
||||
|
||||
protected abstract String getConnectorName();
|
||||
|
||||
protected abstract String getServerName();
|
||||
|
||||
protected abstract JdbcConnection databaseConnection();
|
||||
|
||||
protected abstract Configuration.Builder getConfigurationBuilder();
|
||||
|
||||
protected abstract String topicName();
|
||||
|
||||
protected abstract String tableName();
|
||||
|
||||
protected abstract void createTable() throws Exception;
|
||||
|
||||
protected abstract String createInsert(String eventId, String eventType, String aggregateType,
|
||||
String aggregateId, String payloadJson, String additional);
|
||||
|
||||
protected abstract void waitForStreamingStarted() throws InterruptedException;
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
createTable();
|
||||
|
||||
headerFrom = new HeaderFrom.Value<>();
|
||||
Map<String, String> headerFromConfig = new LinkedHashMap<>();
|
||||
headerFromConfig.put("fields", "source,op,transaction");
|
||||
headerFromConfig.put("headers", "source,op,transaction");
|
||||
headerFromConfig.put("operation", "copy");
|
||||
headerFromConfig.put("header.converter.schemas.enable", "true");
|
||||
headerFrom.configure(headerFromConfig);
|
||||
|
||||
outboxEventRouter = new EventRouter<>();
|
||||
Map<String, String> outboxEventRouterConfig = new LinkedHashMap<>();
|
||||
outboxEventRouterConfig.put("table.expand.json.payload", "true");
|
||||
outboxEventRouter.configure(outboxEventRouterConfig);
|
||||
|
||||
startConnector();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterEach() throws Exception {
|
||||
stopConnector();
|
||||
assertNoRecordsToConsume();
|
||||
databaseConnection().close();
|
||||
headerFrom.close();
|
||||
outboxEventRouter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor({ "DBZ-3642" })
|
||||
public void shouldConvertToCloudEventsInJsonWithMetadataInHeadersAfterOutboxEventRouter() throws Exception {
|
||||
databaseConnection().execute(createInsert(
|
||||
"59a42efd-b015-44a9-9dde-cb36d9002425",
|
||||
"UserCreated",
|
||||
"User",
|
||||
"10711fa5",
|
||||
"{" +
|
||||
"\"someField1\": \"some value 1\"," +
|
||||
"\"someField2\": 7005" +
|
||||
"}",
|
||||
""));
|
||||
|
||||
SourceRecords streamingRecords = consumeRecordsByTopic(1);
|
||||
assertThat(streamingRecords.allRecordsInOrder()).hasSize(1);
|
||||
|
||||
SourceRecord record = streamingRecords.recordsForTopic(topicName()).get(0);
|
||||
SourceRecord recordWithMetadataHeaders = headerFrom.apply(record);
|
||||
SourceRecord routedEvent = outboxEventRouter.apply(recordWithMetadataHeaders);
|
||||
|
||||
assertThat(routedEvent).isNotNull();
|
||||
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
|
||||
assertThat(routedEvent.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
|
||||
assertThat(routedEvent.key()).isEqualTo("10711fa5");
|
||||
assertThat(routedEvent.value()).isInstanceOf(Struct.class);
|
||||
|
||||
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataInHeaders(routedEvent, getConnectorName(), getServerName());
|
||||
}
|
||||
|
||||
private void startConnector() throws Exception {
|
||||
Configuration.Builder configBuilder = getConfigurationBuilder();
|
||||
start(getConnectorClass(), configBuilder.build());
|
||||
assertConnectorIsRunning();
|
||||
waitForStreamingStarted();
|
||||
assertNoRecordsToConsume();
|
||||
}
|
||||
}
|
@ -231,4 +231,5 @@ vsantona,Vincenzo Santoynastaso
|
||||
“vsantonastaso”,Vincenzo Santoynastaso
|
||||
rolevinks,Stein Rolevink
|
||||
matan-cohen,Matan Cohen
|
||||
BigGillyStyle,Andy Pickler
|
||||
BigGillyStyle,Andy Pickler
|
||||
rkudryashov,Roman Kudryashov
|
Loading…
Reference in New Issue
Block a user