DBZ-1292 Catering for the differences in data schema;

* MongoDB doesn't have "before", but "patch"
* Also removing some redundancies and making RecordParser immutable
* Reducing log verbosity during tests
This commit is contained in:
Gunnar Morling 2020-01-13 13:17:14 +01:00
parent 8531adbefd
commit 8c33a4e147
9 changed files with 61 additions and 97 deletions

View File

@ -943,7 +943,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
for (SourceRecord record : topicRecords) { for (SourceRecord record : topicRecords) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo");
} }
storeDocuments("dbit", "restaurants", "restaurants2.json"); storeDocuments("dbit", "restaurants", "restaurants2.json");
@ -954,7 +954,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
for (SourceRecord record : topicRecords2) { for (SourceRecord record : topicRecords2) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo");
} }
stopConnector(); stopConnector();

View File

@ -1971,7 +1971,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
for (SourceRecord record : table) { for (SourceRecord record : table) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mysql", "myServer1");
} }
} }

View File

@ -1385,7 +1385,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
for (SourceRecord record : snapshot) { for (SourceRecord record : snapshot) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server");
} }
// insert some more records and test streaming // insert some more records and test streaming
@ -1397,7 +1397,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
for (SourceRecord record : streaming) { for (SourceRecord record : streaming) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server");
} }
stopConnector(); stopConnector();

View File

@ -498,7 +498,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
for (SourceRecord sourceRecord : snapshotTable1) { for (SourceRecord sourceRecord : snapshotTable1) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1");
} }
for (int i = 0; i < STREAMING_RECORDS_PER_TABLE; i++) { for (int i = 0; i < STREAMING_RECORDS_PER_TABLE; i++) {
@ -516,7 +516,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
for (SourceRecord sourceRecord : streamingTable1) { for (SourceRecord sourceRecord : streamingTable1) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1");
} }
} }

View File

@ -13,3 +13,6 @@ log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
#log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG #log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG
log4j.logger.io.debezium.core=DEBUG log4j.logger.io.debezium.core=DEBUG
log4j.logger.io.debezium.connector.sqlserver=DEBUG log4j.logger.io.debezium.connector.sqlserver=DEBUG
log4j.logger.io.confluent=WARN
log4j.logger.io.debezium.converters.CloudEventsConverterConfig=WARN
log4j.logger.org.apache.kafka.connect.json.JsonConverterConfig=WARN

View File

@ -287,7 +287,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
Schema incompleteSchema = ceSchemaAndValue.schema(); Schema incompleteSchema = ceSchemaAndValue.schema();
Struct ceValue = (Struct) ceSchemaAndValue.value(); Struct ceValue = (Struct) ceSchemaAndValue.value();
byte[] data = ceValue.getBytes(CloudEventsMaker.FieldName.DATA); byte[] data = ceValue.getBytes(CloudEventsMaker.FieldName.DATA);
SchemaAndValue dataSchemaAndValue = avroConverter.toConnectData(topic, data); SchemaAndValue dataSchemaAndValue = avroConverter.toConnectData(topic + DATA_SCHEMA_SUFFIX, data);
SchemaBuilder builder = SchemaBuilder.struct(); SchemaBuilder builder = SchemaBuilder.struct();
for (Field ceField : incompleteSchema.fields()) { for (Field ceField : incompleteSchema.fields()) {
@ -427,7 +427,7 @@ private Schema convertToCeExtensionSchema(Schema schema) {
ceExtensionSchema = SchemaBuilder.bool(); ceExtensionSchema = SchemaBuilder.bool();
} }
// all numbers up to int32 go as int32 // all numbers up to int32 go as int32
else if (schema.type() == Type.INT8 || schema.type() == Type.INT16 || schema.type() == Type.INT16) { else if (schema.type() == Type.INT8 || schema.type() == Type.INT16 || schema.type() == Type.INT16 || schema.type() == Type.INT32) {
ceExtensionSchema = SchemaBuilder.int32(); ceExtensionSchema = SchemaBuilder.int32();
} }
// int64 isn't supported as per CE spec // int64 isn't supported as per CE spec

View File

@ -10,12 +10,10 @@
import java.util.TimeZone; import java.util.TimeZone;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.DataException;
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.util.Collect; import io.debezium.util.Collect;
/** /**
@ -46,7 +44,7 @@ public static final class FieldName {
public static final String DATACONTENTTYPE = "datacontenttype"; public static final String DATACONTENTTYPE = "datacontenttype";
public static final String DATASCHEMA = "dataschema"; public static final String DATASCHEMA = "dataschema";
// TODO not used // TODO DBZ-1701 not used
public static final String SUBJECT = "subject"; public static final String SUBJECT = "subject";
public static final String TIME = "time"; public static final String TIME = "time";
@ -68,7 +66,7 @@ public static final class FieldName {
private final String dataSchemaUriBase; private final String dataSchemaUriBase;
private final Schema ceDataAttributeSchema; private final Schema ceDataAttributeSchema;
RecordParser recordParser; protected final RecordParser recordParser;
static final Map<SerializerType, String> CONTENT_TYPE_NAME_MAP = Collect.hashMapOf( static final Map<SerializerType, String> CONTENT_TYPE_NAME_MAP = Collect.hashMapOf(
SerializerType.JSON, "application/json", SerializerType.JSON, "application/json",
@ -116,16 +114,7 @@ private CloudEventsMaker(RecordParser parser, SerializerType contentType, String
this.recordParser = parser; this.recordParser = parser;
this.dataContentType = contentType; this.dataContentType = contentType;
this.dataSchemaUriBase = dataSchemaUriBase; this.dataSchemaUriBase = dataSchemaUriBase;
this.ceDataAttributeSchema = getDataSchema(recordParser); this.ceDataAttributeSchema = recordParser.dataSchema();
}
private static Schema getDataSchema(RecordParser recordParser) {
SchemaBuilder builder = SchemaBuilder.struct().name(ceDataAttributeSchemaName(recordParser.connectorType()));
builder.field(Envelope.FieldName.BEFORE, recordParser.beforeSchema());
builder.field(Envelope.FieldName.AFTER, recordParser.afterSchema());
return builder.build();
} }
/** /**
@ -209,14 +198,7 @@ public Schema ceDataAttributeSchema() {
* @return the value of the data attribute of CloudEvents * @return the value of the data attribute of CloudEvents
*/ */
public Struct ceDataAttribute() { public Struct ceDataAttribute() {
Struct data = new Struct(ceDataAttributeSchema()); return recordParser.data();
if (recordParser.before() != null) {
data.put(Envelope.FieldName.BEFORE, recordParser.before());
}
if (recordParser.after() != null) {
data.put(Envelope.FieldName.AFTER, recordParser.after());
}
return data;
} }
/** /**
@ -230,15 +212,6 @@ public String ceEnvelopeSchemaName() {
+ "CloudEvents.Envelope"; + "CloudEvents.Envelope";
} }
/**
* Construct the name of the schema of the data attribute of CloudEvents.
*
* @return the name of the schema of the data attribute of CloudEvents
*/
private static String ceDataAttributeSchemaName(String connectorType) {
return "io.debezium.connector." + connectorType + ".Data";
}
/** /**
* CloudEvents maker for records produced by MySQL connector. * CloudEvents maker for records produced by MySQL connector.
*/ */

View File

@ -9,7 +9,9 @@
import java.util.Set; import java.util.Set;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.DataException;
@ -24,16 +26,14 @@
*/ */
public abstract class RecordParser { public abstract class RecordParser {
private Object before; private final Struct record;
private Object after; private final Struct source;
private Struct source; private final String op;
private String op; private final Schema opSchema;
private Schema opSchema; private final String ts_ms;
private String ts_ms; private final Schema ts_msSchema;
private Schema ts_msSchema; private final Schema dataSchema;
private Schema beforeSchema; private final String connectorType;
private Schema afterSchema;
private String connectorType;
static final Set<String> SOURCE_FIELDS = Collect.unmodifiableSet( static final Set<String> SOURCE_FIELDS = Collect.unmodifiableSet(
AbstractSourceInfo.DEBEZIUM_VERSION_KEY, AbstractSourceInfo.DEBEZIUM_VERSION_KEY,
@ -68,39 +68,38 @@ public static RecordParser create(Schema schema, Object value) {
} }
} }
RecordParser(Schema schema, Struct record) { protected RecordParser(Schema schema, Struct record, String... dataFields) {
parse(schema, record); this.record = record;
this.source = record.getStruct(Envelope.FieldName.SOURCE);
this.op = record.getString(Envelope.FieldName.OPERATION);
this.opSchema = schema.field(Envelope.FieldName.OPERATION).schema();
this.ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString();
this.ts_msSchema = schema.field(Envelope.FieldName.TIMESTAMP).schema();
this.connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
this.dataSchema = getDataSchema(schema, connectorType, dataFields);
} }
private void parse(Schema schema, Struct record) { private static Schema getDataSchema(Schema schema, String connectorType, String... fields) {
before = schema.field(Envelope.FieldName.BEFORE) == null ? null : record.get(Envelope.FieldName.BEFORE); SchemaBuilder builder = SchemaBuilder.struct().name("io.debezium.connector.mysql.Data");
after = schema.field(Envelope.FieldName.AFTER) == null ? null : record.get(Envelope.FieldName.AFTER);
source = record.getStruct(Envelope.FieldName.SOURCE); for (String field : fields) {
op = record.getString(Envelope.FieldName.OPERATION); builder.field(field, schema.field(field).schema());
opSchema = schema.field(Envelope.FieldName.OPERATION).schema(); }
ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString();
ts_msSchema = schema.field(Envelope.FieldName.TIMESTAMP).schema(); return builder.build();
beforeSchema = schema.field(Envelope.FieldName.BEFORE).schema();
afterSchema = schema.field(Envelope.FieldName.AFTER).schema();
connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
} }
/** /**
* Get the value of the before field in the record; may be null. * Get the value of the data field in the record; may not be null.
*
* @return the value of the before field
*/ */
public Object before() { public Struct data() {
return before; Struct data = new Struct(dataSchema());
for (Field field : dataSchema.fields()) {
data.put(field, record.get(field));
} }
/** return data;
* Get the value of the after field in the record; may be null.
*
* @return the value of the after field
*/
public Object after() {
return after;
} }
/** /**
@ -149,21 +148,10 @@ public Schema ts_msSchema() {
} }
/** /**
* Get the schema of the before field in the record; may be null. * Get the schema of the data field in the record; may be not be null.
*
* @return the schema of the before field
*/ */
public Schema beforeSchema() { public Schema dataSchema() {
return beforeSchema; return dataSchema;
}
/**
* Get the schema of the after field in the record; may be null.
*
* @return the schema of the after field
*/
public Schema afterSchema() {
return afterSchema;
} }
/** /**
@ -207,7 +195,7 @@ public static final class MysqlRecordParser extends RecordParser {
QUERY_KEY); QUERY_KEY);
MysqlRecordParser(Schema schema, Struct record) { MysqlRecordParser(Schema schema, Struct record) {
super(schema, record); super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
} }
@Override @Override
@ -237,7 +225,7 @@ public static final class PostgresRecordParser extends RecordParser {
LSN_KEY); LSN_KEY);
PostgresRecordParser(Schema schema, Struct record) { PostgresRecordParser(Schema schema, Struct record) {
super(schema, record); super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
} }
@Override @Override
@ -269,7 +257,7 @@ public static final class MongodbRecordParser extends RecordParser {
COLLECTION); COLLECTION);
MongodbRecordParser(Schema schema, Struct record) { MongodbRecordParser(Schema schema, Struct record) {
super(schema, record); super(schema, record, Envelope.FieldName.AFTER, "patch");
} }
@Override @Override
@ -299,7 +287,7 @@ public static final class SqlserverRecordParser extends RecordParser {
EVENT_SERIAL_NO_KEY); EVENT_SERIAL_NO_KEY);
SqlserverRecordParser(Schema schema, Struct record) { SqlserverRecordParser(Schema schema, Struct record) {
super(schema, record); super(schema, record, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
} }
@Override @Override

View File

@ -200,7 +200,7 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r
} }
} }
public static void shouldConvertToCloudEventsInAvro(SourceRecord record) { public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String connectorName, String serverName) {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("serializer.type", "avro"); config.put("serializer.type", "avro");
config.put("data.serializer.type", "avro"); config.put("data.serializer.type", "avro");
@ -250,9 +250,9 @@ public static void shouldConvertToCloudEventsInAvro(SourceRecord record) {
msg = "inspecting all required CloudEvents fields in the value"; msg = "inspecting all required CloudEvents fields in the value";
avroValue = (Struct) avroSchemaAndValue.value(); avroValue = (Struct) avroSchemaAndValue.value();
assertThat(avroValue.get(CloudEventsMaker.FieldName.ID)).isNotNull(); assertThat(avroValue.get(CloudEventsMaker.FieldName.ID)).isNotNull();
assertThat(avroValue.getString(CloudEventsMaker.FieldName.SOURCE)).isEqualTo("/debezium/postgresql/test_server"); assertThat(avroValue.getString(CloudEventsMaker.FieldName.SOURCE)).isEqualTo("/debezium/" + connectorName + "/" + serverName);
assertThat(avroValue.get(CloudEventsMaker.FieldName.SPECVERSION)).isEqualTo("1.0"); assertThat(avroValue.get(CloudEventsMaker.FieldName.SPECVERSION)).isEqualTo("1.0");
assertThat(avroValue.get(CloudEventsMaker.FieldName.TYPE)).isEqualTo("io.debezium.postgresql.datachangeevent"); assertThat(avroValue.get(CloudEventsMaker.FieldName.TYPE)).isEqualTo("io.debezium." + connectorName + ".datachangeevent");
assertThat(avroValue.get(CloudEventsMaker.FieldName.DATACONTENTTYPE)).isEqualTo("avro/binary"); assertThat(avroValue.get(CloudEventsMaker.FieldName.DATACONTENTTYPE)).isEqualTo("avro/binary");
assertThat(avroValue.getString(CloudEventsMaker.FieldName.DATASCHEMA)).startsWith("http://fake-url/schemas/ids/"); assertThat(avroValue.getString(CloudEventsMaker.FieldName.DATASCHEMA)).startsWith("http://fake-url/schemas/ids/");
assertThat(avroValue.get(CloudEventsMaker.FieldName.TIME)).isNotNull(); assertThat(avroValue.get(CloudEventsMaker.FieldName.TIME)).isNotNull();