diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index 2c2348cb2..e97d28dfe 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -943,7 +943,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { for (SourceRecord record : topicRecords) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo"); } storeDocuments("dbit", "restaurants", "restaurants2.json"); @@ -954,7 +954,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { for (SourceRecord record : topicRecords2) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo"); } stopConnector(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 913893cae..ba7212436 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -1971,7 +1971,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { for (SourceRecord record : table) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mysql", "myServer1"); } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 85c7f3d54..5e8db0f40 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1385,7 +1385,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { for (SourceRecord record : snapshot) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server"); } // insert some more records and test streaming @@ -1397,7 +1397,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { for (SourceRecord record : streaming) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server"); } stopConnector(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index db6915a40..ba0462024 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -498,7 +498,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { for (SourceRecord sourceRecord : snapshotTable1) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1"); } for (int i = 0; i < STREAMING_RECORDS_PER_TABLE; i++) { @@ -516,7 +516,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { for (SourceRecord sourceRecord : streamingTable1) { CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); - CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord); + CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1"); } } diff --git a/debezium-connector-sqlserver/src/test/resources/log4j.properties b/debezium-connector-sqlserver/src/test/resources/log4j.properties index a69d4ed09..84d35cb9b 100644 --- a/debezium-connector-sqlserver/src/test/resources/log4j.properties +++ b/debezium-connector-sqlserver/src/test/resources/log4j.properties @@ -13,3 +13,6 @@ log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN #log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG log4j.logger.io.debezium.core=DEBUG log4j.logger.io.debezium.connector.sqlserver=DEBUG +log4j.logger.io.confluent=WARN +log4j.logger.io.debezium.converters.CloudEventsConverterConfig=WARN +log4j.logger.org.apache.kafka.connect.json.JsonConverterConfig=WARN diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java index 60e4e3755..449849a89 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java @@ -287,7 +287,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) { Schema incompleteSchema = ceSchemaAndValue.schema(); Struct ceValue = (Struct) ceSchemaAndValue.value(); byte[] data = ceValue.getBytes(CloudEventsMaker.FieldName.DATA); - SchemaAndValue dataSchemaAndValue = avroConverter.toConnectData(topic, data); + SchemaAndValue dataSchemaAndValue = avroConverter.toConnectData(topic + DATA_SCHEMA_SUFFIX, data); SchemaBuilder builder = SchemaBuilder.struct(); for (Field ceField : incompleteSchema.fields()) { @@ -427,7 +427,7 @@ private Schema convertToCeExtensionSchema(Schema schema) { ceExtensionSchema = SchemaBuilder.bool(); } // all numbers up to int32 go as int32 - else if (schema.type() == Type.INT8 || schema.type() == Type.INT16 || schema.type() == Type.INT16) { + else if (schema.type() == Type.INT8 || schema.type() == Type.INT16 || schema.type() == Type.INT16 || schema.type() == Type.INT32) { ceExtensionSchema = SchemaBuilder.int32(); } // int64 isn't supported as per CE spec diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java index 61b6e5f31..444e0c890 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsMaker.java @@ -10,12 +10,10 @@ import java.util.TimeZone; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import io.debezium.connector.AbstractSourceInfo; -import io.debezium.data.Envelope; import io.debezium.util.Collect; /** @@ -46,7 +44,7 @@ public static final class FieldName { public static final String DATACONTENTTYPE = "datacontenttype"; public static final String DATASCHEMA = "dataschema"; - // TODO not used + // TODO DBZ-1701 not used public static final String SUBJECT = "subject"; public static final String TIME = "time"; @@ -68,7 +66,7 @@ public static final class FieldName { private final String dataSchemaUriBase; private final Schema ceDataAttributeSchema; - RecordParser recordParser; + protected final RecordParser recordParser; static final Map CONTENT_TYPE_NAME_MAP = Collect.hashMapOf( SerializerType.JSON, "application/json", @@ -116,16 +114,7 @@ private CloudEventsMaker(RecordParser parser, SerializerType contentType, String this.recordParser = parser; this.dataContentType = contentType; this.dataSchemaUriBase = dataSchemaUriBase; - this.ceDataAttributeSchema = getDataSchema(recordParser); - } - - private static Schema getDataSchema(RecordParser recordParser) { - SchemaBuilder builder = SchemaBuilder.struct().name(ceDataAttributeSchemaName(recordParser.connectorType())); - - builder.field(Envelope.FieldName.BEFORE, recordParser.beforeSchema()); - builder.field(Envelope.FieldName.AFTER, recordParser.afterSchema()); - - return builder.build(); + this.ceDataAttributeSchema = recordParser.dataSchema(); } /** @@ -209,14 +198,7 @@ public Schema ceDataAttributeSchema() { * @return the value of the data attribute of CloudEvents */ public Struct ceDataAttribute() { - Struct data = new Struct(ceDataAttributeSchema()); - if (recordParser.before() != null) { - data.put(Envelope.FieldName.BEFORE, recordParser.before()); - } - if (recordParser.after() != null) { - data.put(Envelope.FieldName.AFTER, recordParser.after()); - } - return data; + return recordParser.data(); } /** @@ -230,15 +212,6 @@ public String ceEnvelopeSchemaName() { + "CloudEvents.Envelope"; } - /** - * Construct the name of the schema of the data attribute of CloudEvents. - * - * @return the name of the schema of the data attribute of CloudEvents - */ - private static String ceDataAttributeSchemaName(String connectorType) { - return "io.debezium.connector." + connectorType + ".Data"; - } - /** * CloudEvents maker for records produced by MySQL connector. */ diff --git a/debezium-core/src/main/java/io/debezium/converters/RecordParser.java b/debezium-core/src/main/java/io/debezium/converters/RecordParser.java index 7372b7497..4c97a95b8 100644 --- a/debezium-core/src/main/java/io/debezium/converters/RecordParser.java +++ b/debezium-core/src/main/java/io/debezium/converters/RecordParser.java @@ -9,7 +9,9 @@ import java.util.Set; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; @@ -24,16 +26,14 @@ */ public abstract class RecordParser { - private Object before; - private Object after; - private Struct source; - private String op; - private Schema opSchema; - private String ts_ms; - private Schema ts_msSchema; - private Schema beforeSchema; - private Schema afterSchema; - private String connectorType; + private final Struct record; + private final Struct source; + private final String op; + private final Schema opSchema; + private final String ts_ms; + private final Schema ts_msSchema; + private final Schema dataSchema; + private final String connectorType; static final Set SOURCE_FIELDS = Collect.unmodifiableSet( AbstractSourceInfo.DEBEZIUM_VERSION_KEY, @@ -68,39 +68,38 @@ public static RecordParser create(Schema schema, Object value) { } } - RecordParser(Schema schema, Struct record) { - parse(schema, record); + protected RecordParser(Schema schema, Struct record, String... dataFields) { + this.record = record; + this.source = record.getStruct(Envelope.FieldName.SOURCE); + this.op = record.getString(Envelope.FieldName.OPERATION); + this.opSchema = schema.field(Envelope.FieldName.OPERATION).schema(); + this.ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString(); + this.ts_msSchema = schema.field(Envelope.FieldName.TIMESTAMP).schema(); + this.connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY); + this.dataSchema = getDataSchema(schema, connectorType, dataFields); } - private void parse(Schema schema, Struct record) { - before = schema.field(Envelope.FieldName.BEFORE) == null ? null : record.get(Envelope.FieldName.BEFORE); - after = schema.field(Envelope.FieldName.AFTER) == null ? null : record.get(Envelope.FieldName.AFTER); - source = record.getStruct(Envelope.FieldName.SOURCE); - op = record.getString(Envelope.FieldName.OPERATION); - opSchema = schema.field(Envelope.FieldName.OPERATION).schema(); - ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString(); - ts_msSchema = schema.field(Envelope.FieldName.TIMESTAMP).schema(); - beforeSchema = schema.field(Envelope.FieldName.BEFORE).schema(); - afterSchema = schema.field(Envelope.FieldName.AFTER).schema(); - connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY); + private static Schema getDataSchema(Schema schema, String connectorType, String... fields) { + SchemaBuilder builder = SchemaBuilder.struct().name("io.debezium.connector.mysql.Data"); + + for (String field : fields) { + builder.field(field, schema.field(field).schema()); + } + + return builder.build(); } /** - * Get the value of the before field in the record; may be null. - * - * @return the value of the before field + * Get the value of the data field in the record; may not be null. */ - public Object before() { - return before; - } + public Struct data() { + Struct data = new Struct(dataSchema()); - /** - * Get the value of the after field in the record; may be null. - * - * @return the value of the after field - */ - public Object after() { - return after; + for (Field field : dataSchema.fields()) { + data.put(field, record.get(field)); + } + + return data; } /** @@ -149,21 +148,10 @@ public Schema ts_msSchema() { } /** - * Get the schema of the before field in the record; may be null. - * - * @return the schema of the before field + * Get the schema of the data field in the record; may be not be null. */ - public Schema beforeSchema() { - return beforeSchema; - } - - /** - * Get the schema of the after field in the record; may be null. - * - * @return the schema of the after field - */ - public Schema afterSchema() { - return afterSchema; + public Schema dataSchema() { + return dataSchema; } /** @@ -207,7 +195,7 @@ public static final class MysqlRecordParser extends RecordParser { QUERY_KEY); MysqlRecordParser(Schema schema, Struct record) { - super(schema, record); + super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER); } @Override @@ -237,7 +225,7 @@ public static final class PostgresRecordParser extends RecordParser { LSN_KEY); PostgresRecordParser(Schema schema, Struct record) { - super(schema, record); + super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER); } @Override @@ -269,7 +257,7 @@ public static final class MongodbRecordParser extends RecordParser { COLLECTION); MongodbRecordParser(Schema schema, Struct record) { - super(schema, record); + super(schema, record, Envelope.FieldName.AFTER, "patch"); } @Override @@ -299,7 +287,7 @@ public static final class SqlserverRecordParser extends RecordParser { EVENT_SERIAL_NO_KEY); SqlserverRecordParser(Schema schema, Struct record) { - super(schema, record); + super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER); } @Override diff --git a/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java b/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java index b0934ace7..36b99560a 100644 --- a/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java +++ b/debezium-core/src/test/java/io/debezium/converters/CloudEventsConverterTest.java @@ -200,7 +200,7 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r } } - public static void shouldConvertToCloudEventsInAvro(SourceRecord record) { + public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String connectorName, String serverName) { Map config = new HashMap<>(); config.put("serializer.type", "avro"); config.put("data.serializer.type", "avro"); @@ -250,9 +250,9 @@ public static void shouldConvertToCloudEventsInAvro(SourceRecord record) { msg = "inspecting all required CloudEvents fields in the value"; avroValue = (Struct) avroSchemaAndValue.value(); assertThat(avroValue.get(CloudEventsMaker.FieldName.ID)).isNotNull(); - assertThat(avroValue.getString(CloudEventsMaker.FieldName.SOURCE)).isEqualTo("/debezium/postgresql/test_server"); + assertThat(avroValue.getString(CloudEventsMaker.FieldName.SOURCE)).isEqualTo("/debezium/" + connectorName + "/" + serverName); assertThat(avroValue.get(CloudEventsMaker.FieldName.SPECVERSION)).isEqualTo("1.0"); - assertThat(avroValue.get(CloudEventsMaker.FieldName.TYPE)).isEqualTo("io.debezium.postgresql.datachangeevent"); + assertThat(avroValue.get(CloudEventsMaker.FieldName.TYPE)).isEqualTo("io.debezium." + connectorName + ".datachangeevent"); assertThat(avroValue.get(CloudEventsMaker.FieldName.DATACONTENTTYPE)).isEqualTo("avro/binary"); assertThat(avroValue.getString(CloudEventsMaker.FieldName.DATASCHEMA)).startsWith("http://fake-url/schemas/ids/"); assertThat(avroValue.get(CloudEventsMaker.FieldName.TIME)).isNotNull();