diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index e97d28dfe..a4b8a8417 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -941,9 +941,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { SourceRecords records = consumeRecordsByTopic(12); List topicRecords = records.recordsForTopic("mongo.dbit.restaurants"); for (SourceRecord record : topicRecords) { - CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo"); + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo", false); } storeDocuments("dbit", "restaurants", "restaurants2.json"); @@ -952,9 +952,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { SourceRecords records2 = consumeRecordsByTopic(4); List topicRecords2 = records2.recordsForTopic("mongo.dbit.restaurants"); for (SourceRecord record : topicRecords2) { - CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo"); + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo", false); } stopConnector(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index ba7212436..1d802dc39 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -1969,9 +1969,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { final List table = records.recordsForTopic(DATABASE.topicForTable(tableName)); for (SourceRecord record : table) { - CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mysql", "myServer1"); + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mysql", "myServer1", false); } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 97b6af52f..eb21a4cae 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1375,6 +1375,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { TestHelper.execute(SETUP_TABLES_STMT); Configuration.Builder configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE); start(PostgresConnector.class, configBuilder.build()); @@ -1384,21 +1385,22 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { List snapshot = snapshotRecords.allRecordsInOrder(); for (SourceRecord record : snapshot) { - CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server"); + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server", false); } // insert some more records and test streaming TestHelper.execute(INSERT_STMT); - SourceRecords streamingRecords = consumeRecordsByTopic(2); - List streaming = streamingRecords.allRecordsInOrder(); + Testing.Print.enable(); + SourceRecords streamingRecords = consumeRecordsByTopic(2 + 2); + List streaming = streamingRecords.allRecordsInOrder().subList(1, 3); for (SourceRecord record : streaming) { - CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server"); + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, true); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, true); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server", true); } stopConnector(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index 7e254dd15..10868ec3a 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -496,9 +496,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { // test snapshot for (SourceRecord sourceRecord : snapshotTable1) { - CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); - CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1"); + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1", false); } for (int i = 0; i < STREAMING_RECORDS_PER_TABLE; i++) { @@ -514,9 +514,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { // test streaming for (SourceRecord sourceRecord : streamingTable1) { - CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); - CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1"); + CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1", false); } } diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java index ce73f6c70..0faaa00aa 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.connect.data.Field; @@ -41,6 +42,7 @@ import io.debezium.config.Configuration; import io.debezium.config.Instantiator; import io.debezium.data.Envelope; +import io.debezium.pipeline.txmetadata.TransactionMonitor; import io.debezium.util.SchemaNameAdjuster; /** @@ -68,6 +70,7 @@ public class CloudEventsConverter implements Converter { private static final String EXTENSION_NAME_PREFIX = "iodebezium"; + private static final String TX_ATTRIBUTE_PREFIX = "tx"; /** * Instantiated reflectively to avoid hard dependency to Avro converter. @@ -167,6 +170,10 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) { if (schema == null || value == null) { return null; } + if (!schema.name().endsWith(".Envelope")) { + // TODO Handling of non-data messages like schema change or transaction metadata + return null; + } if (schema.type() != STRUCT) { throw new DataException("Mismatching schema"); } @@ -361,6 +368,7 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); Struct source = parser.source(); Schema sourceSchema = parser.source().schema(); + final Struct transaction = parser.transaction(); // construct schema of CloudEvents envelope CESchemaBuilder ceSchemaBuilder = defineSchema() @@ -378,9 +386,10 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven ceSchemaBuilder.withSchema(adjustExtensionName(Envelope.FieldName.OPERATION), Schema.STRING_SCHEMA); - for (Field field : sourceSchema.fields()) { - ceSchemaBuilder.withSchema(adjustExtensionName(field.name()), convertToCeExtensionSchema(field.schema())); - } + ceSchemaFromSchema(sourceSchema, ceSchemaBuilder, CloudEventsConverter::adjustExtensionName, false); + + // transaction attributes + ceSchemaFromSchema(TransactionMonitor.TRANSACTION_BLOCK_SCHEMA, ceSchemaBuilder, CloudEventsConverter::txExtensionName, true); ceSchemaBuilder.withSchema(CloudEventsMaker.FieldName.DATA, dataSchemaType); @@ -400,12 +409,10 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven ceValueBuilder.withValue(adjustExtensionName(Envelope.FieldName.OPERATION), parser.op()); - for (Field field : sourceSchema.fields()) { - Object value = source.get(field); - if (field.schema().type() == Type.INT64 && value != null) { - value = String.valueOf((long) value); - } - ceValueBuilder.withValue(adjustExtensionName(field.name()), value); + ceValueFromStruct(source, sourceSchema, ceValueBuilder, CloudEventsConverter::adjustExtensionName); + + if (transaction != null) { + ceValueFromStruct(transaction, TransactionMonitor.TRANSACTION_BLOCK_SCHEMA, ceValueBuilder, CloudEventsConverter::txExtensionName); } ceValueBuilder.withValue(CloudEventsMaker.FieldName.DATA, serializedData); @@ -413,11 +420,27 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven return new SchemaAndValue(ceSchema, ceValueBuilder.build()); } + private void ceValueFromStruct(Struct struct, Schema schema, CEValueBuilder ceValueBuilder, Function nameMapper) { + for (Field field : schema.fields()) { + Object value = struct.get(field); + if (field.schema().type() == Type.INT64 && value != null) { + value = String.valueOf((long) value); + } + ceValueBuilder.withValue(nameMapper.apply(field.name()), value); + } + } + + private void ceSchemaFromSchema(Schema schema, CESchemaBuilder ceSchemaBuilder, Function nameMapper, boolean alwaysOptional) { + for (Field field : schema.fields()) { + ceSchemaBuilder.withSchema(nameMapper.apply(field.name()), convertToCeExtensionSchema(field.schema(), alwaysOptional)); + } + } + /** * Converts the given source attribute schema into a corresponding CE extension schema. * The types supported there are limited, e.g. int64 can only be represented as string. */ - private Schema convertToCeExtensionSchema(Schema schema) { + private Schema convertToCeExtensionSchema(Schema schema, boolean alwaysOptional) { SchemaBuilder ceExtensionSchema; if (schema.type() == Type.BOOLEAN) { @@ -437,13 +460,17 @@ else if (schema.type() == Type.STRING || schema.type() == Type.INT64) { throw new IllegalArgumentException("Source field of type " + schema.type() + " cannot be converted into CloudEvents extension attribute."); } - if (schema.isOptional()) { + if (alwaysOptional || schema.isOptional()) { ceExtensionSchema.optional(); } return ceExtensionSchema.build(); } + private Schema convertToCeExtensionSchema(Schema schema) { + return convertToCeExtensionSchema(schema, false); + } + private static CESchemaBuilder defineSchema() { return new CESchemaBuilder() { private final SchemaBuilder builder = SchemaBuilder.struct(); @@ -540,6 +567,10 @@ static String adjustExtensionName(String original) { return sb.toString(); } + private static String txExtensionName(String name) { + return adjustExtensionName(TX_ATTRIBUTE_PREFIX + name); + } + private static boolean isValidExtensionNameCharacter(char c) { return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9'); } diff --git a/debezium-core/src/main/java/io/debezium/converters/RecordParser.java b/debezium-core/src/main/java/io/debezium/converters/RecordParser.java index 4c97a95b8..26122342b 100644 --- a/debezium-core/src/main/java/io/debezium/converters/RecordParser.java +++ b/debezium-core/src/main/java/io/debezium/converters/RecordParser.java @@ -28,6 +28,7 @@ public abstract class RecordParser { private final Struct record; private final Struct source; + private final Struct transaction; private final String op; private final Schema opSchema; private final String ts_ms; @@ -71,6 +72,7 @@ public static RecordParser create(Schema schema, Object value) { protected RecordParser(Schema schema, Struct record, String... dataFields) { this.record = record; this.source = record.getStruct(Envelope.FieldName.SOURCE); + this.transaction = record.schema().field(Envelope.FieldName.TRANSACTION) != null ? record.getStruct(Envelope.FieldName.TRANSACTION) : null; this.op = record.getString(Envelope.FieldName.OPERATION); this.opSchema = schema.field(Envelope.FieldName.OPERATION).schema(); this.ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString(); @@ -111,6 +113,15 @@ public Struct source() { return source; } + /** + * Get the value of the transaction field in the record. + * + * @return the value of the transaction field + */ + public Struct transaction() { + return transaction; + } + /** * Get the value of the op field in the record. * diff --git a/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java b/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java index 36b99560a..3cb5fd439 100644 --- a/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java +++ b/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java @@ -31,7 +31,7 @@ public class CloudEventsConverterTest { - public static void shouldConvertToCloudEventsInJson(SourceRecord record) { + public static void shouldConvertToCloudEventsInJson(SourceRecord record, boolean hasTransaction) { Map config = new HashMap<>(); config.put("serializer.type", "json"); config.put("data.serializer.type", "json"); @@ -92,6 +92,12 @@ public static void shouldConvertToCloudEventsInJson(SourceRecord record) { msg = "inspecting required CloudEvents extension attributes for Debezium"; assertThat(valueJson.get("iodebeziumop")).isNotNull(); assertThat(valueJson.get("iodebeziumtsms")).isNotNull(); + if (hasTransaction) { + msg = "inspecting transaction metadata attributes"; + assertThat(valueJson.get("iodebeziumtxid")).isNotNull(); + assertThat(valueJson.get("iodebeziumtxtotalorder")).isNotNull(); + assertThat(valueJson.get("iodebeziumtxdatacollectionorder")).isNotNull(); + } msg = "inspecting the data field in the value"; dataJson = valueJson.get(CloudEventsMaker.FieldName.DATA); assertThat(dataJson.get(CloudEventsMaker.FieldName.SCHEMA_FIELD_NAME)).isNotNull(); @@ -113,7 +119,7 @@ public static void shouldConvertToCloudEventsInJson(SourceRecord record) { } } - public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record) { + public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record, boolean hasTransaction) { Map config = new HashMap<>(); config.put("serializer.type", "json"); config.put("data.serializer.type", "avro"); @@ -177,6 +183,12 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r msg = "inspecting required CloudEvents extension attributes for Debezium"; assertThat(valueJson.get("iodebeziumop")).isNotNull(); assertThat(valueJson.get("iodebeziumtsms")).isNotNull(); + if (hasTransaction) { + msg = "inspecting transaction metadata attributes"; + assertThat(valueJson.get("iodebeziumtxid")).isNotNull(); + assertThat(valueJson.get("iodebeziumtxtotalorder")).isNotNull(); + assertThat(valueJson.get("iodebeziumtxdatacollectionorder")).isNotNull(); + } msg = "inspecting the data field in the value"; dataJson = valueJson.get(CloudEventsMaker.FieldName.DATA); assertThat(dataJson).isNotNull(); @@ -200,7 +212,7 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r } } - public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String connectorName, String serverName) { + public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String connectorName, String serverName, boolean hasTransaction) { Map config = new HashMap<>(); config.put("serializer.type", "avro"); config.put("data.serializer.type", "avro"); @@ -260,6 +272,12 @@ public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String msg = "inspecting required CloudEvents extension attributes in the value"; assertThat(avroValue.get(CloudEventsConverter.adjustExtensionName(Envelope.FieldName.OPERATION))).isNotNull(); assertThat(avroValue.get(CloudEventsConverter.adjustExtensionName(Envelope.FieldName.TIMESTAMP))).isNotNull(); + if (hasTransaction) { + msg = "inspecting transaction metadata attributes"; + assertThat(avroValue.get("iodebeziumtxid")).isNotNull(); + assertThat(avroValue.get("iodebeziumtxtotalorder")).isNotNull(); + assertThat(avroValue.get("iodebeziumtxdatacollectionorder")).isNotNull(); + } msg = "inspecting the data field in the value"; Struct avroDataField = avroValue.getStruct(CloudEventsMaker.FieldName.DATA); // before field may be null