DBZ-1052 CE support for tx metadata in data messages

This commit is contained in:
Jiri Pechanec 2020-01-30 08:49:08 +01:00 committed by Gunnar Morling
parent 0e671d6167
commit ba10a229b6
7 changed files with 99 additions and 37 deletions

View File

@ -941,9 +941,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
SourceRecords records = consumeRecordsByTopic(12); SourceRecords records = consumeRecordsByTopic(12);
List<SourceRecord> topicRecords = records.recordsForTopic("mongo.dbit.restaurants"); List<SourceRecord> topicRecords = records.recordsForTopic("mongo.dbit.restaurants");
for (SourceRecord record : topicRecords) { for (SourceRecord record : topicRecords) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo"); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo", false);
} }
storeDocuments("dbit", "restaurants", "restaurants2.json"); storeDocuments("dbit", "restaurants", "restaurants2.json");
@ -952,9 +952,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
SourceRecords records2 = consumeRecordsByTopic(4); SourceRecords records2 = consumeRecordsByTopic(4);
List<SourceRecord> topicRecords2 = records2.recordsForTopic("mongo.dbit.restaurants"); List<SourceRecord> topicRecords2 = records2.recordsForTopic("mongo.dbit.restaurants");
for (SourceRecord record : topicRecords2) { for (SourceRecord record : topicRecords2) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo"); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mongodb", "mongo", false);
} }
stopConnector(); stopConnector();

View File

@ -1969,9 +1969,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
final List<SourceRecord> table = records.recordsForTopic(DATABASE.topicForTable(tableName)); final List<SourceRecord> table = records.recordsForTopic(DATABASE.topicForTable(tableName));
for (SourceRecord record : table) { for (SourceRecord record : table) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mysql", "myServer1"); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "mysql", "myServer1", false);
} }
} }

View File

@ -1375,6 +1375,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT); TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig() Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE); .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
start(PostgresConnector.class, configBuilder.build()); start(PostgresConnector.class, configBuilder.build());
@ -1384,21 +1385,22 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
List<SourceRecord> snapshot = snapshotRecords.allRecordsInOrder(); List<SourceRecord> snapshot = snapshotRecords.allRecordsInOrder();
for (SourceRecord record : snapshot) { for (SourceRecord record : snapshot) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server"); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server", false);
} }
// insert some more records and test streaming // insert some more records and test streaming
TestHelper.execute(INSERT_STMT); TestHelper.execute(INSERT_STMT);
SourceRecords streamingRecords = consumeRecordsByTopic(2); Testing.Print.enable();
List<SourceRecord> streaming = streamingRecords.allRecordsInOrder(); SourceRecords streamingRecords = consumeRecordsByTopic(2 + 2);
List<SourceRecord> streaming = streamingRecords.allRecordsInOrder().subList(1, 3);
for (SourceRecord record : streaming) { for (SourceRecord record : streaming) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(record, true);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(record, true);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server"); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(record, "postgresql", "test_server", true);
} }
stopConnector(); stopConnector();

View File

@ -496,9 +496,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
// test snapshot // test snapshot
for (SourceRecord sourceRecord : snapshotTable1) { for (SourceRecord sourceRecord : snapshotTable1) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1"); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1", false);
} }
for (int i = 0; i < STREAMING_RECORDS_PER_TABLE; i++) { for (int i = 0; i < STREAMING_RECORDS_PER_TABLE; i++) {
@ -514,9 +514,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
// test streaming // test streaming
for (SourceRecord sourceRecord : streamingTable1) { for (SourceRecord sourceRecord : streamingTable1) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1"); CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1", false);
} }
} }

View File

@ -15,6 +15,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
@ -41,6 +42,7 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Instantiator; import io.debezium.config.Instantiator;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.SchemaNameAdjuster;
/** /**
@ -68,6 +70,7 @@
public class CloudEventsConverter implements Converter { public class CloudEventsConverter implements Converter {
private static final String EXTENSION_NAME_PREFIX = "iodebezium"; private static final String EXTENSION_NAME_PREFIX = "iodebezium";
private static final String TX_ATTRIBUTE_PREFIX = "tx";
/** /**
* Instantiated reflectively to avoid hard dependency to Avro converter. * Instantiated reflectively to avoid hard dependency to Avro converter.
@ -167,6 +170,10 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) {
if (schema == null || value == null) { if (schema == null || value == null) {
return null; return null;
} }
if (!schema.name().endsWith(".Envelope")) {
// TODO Handling of non-data messages like schema change or transaction metadata
return null;
}
if (schema.type() != STRUCT) { if (schema.type() != STRUCT) {
throw new DataException("Mismatching schema"); throw new DataException("Mismatching schema");
} }
@ -361,6 +368,7 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
Struct source = parser.source(); Struct source = parser.source();
Schema sourceSchema = parser.source().schema(); Schema sourceSchema = parser.source().schema();
final Struct transaction = parser.transaction();
// construct schema of CloudEvents envelope // construct schema of CloudEvents envelope
CESchemaBuilder ceSchemaBuilder = defineSchema() CESchemaBuilder ceSchemaBuilder = defineSchema()
@ -378,9 +386,10 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven
ceSchemaBuilder.withSchema(adjustExtensionName(Envelope.FieldName.OPERATION), Schema.STRING_SCHEMA); ceSchemaBuilder.withSchema(adjustExtensionName(Envelope.FieldName.OPERATION), Schema.STRING_SCHEMA);
for (Field field : sourceSchema.fields()) { ceSchemaFromSchema(sourceSchema, ceSchemaBuilder, CloudEventsConverter::adjustExtensionName, false);
ceSchemaBuilder.withSchema(adjustExtensionName(field.name()), convertToCeExtensionSchema(field.schema()));
} // transaction attributes
ceSchemaFromSchema(TransactionMonitor.TRANSACTION_BLOCK_SCHEMA, ceSchemaBuilder, CloudEventsConverter::txExtensionName, true);
ceSchemaBuilder.withSchema(CloudEventsMaker.FieldName.DATA, dataSchemaType); ceSchemaBuilder.withSchema(CloudEventsMaker.FieldName.DATA, dataSchemaType);
@ -400,12 +409,10 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven
ceValueBuilder.withValue(adjustExtensionName(Envelope.FieldName.OPERATION), parser.op()); ceValueBuilder.withValue(adjustExtensionName(Envelope.FieldName.OPERATION), parser.op());
for (Field field : sourceSchema.fields()) { ceValueFromStruct(source, sourceSchema, ceValueBuilder, CloudEventsConverter::adjustExtensionName);
Object value = source.get(field);
if (field.schema().type() == Type.INT64 && value != null) { if (transaction != null) {
value = String.valueOf((long) value); ceValueFromStruct(transaction, TransactionMonitor.TRANSACTION_BLOCK_SCHEMA, ceValueBuilder, CloudEventsConverter::txExtensionName);
}
ceValueBuilder.withValue(adjustExtensionName(field.name()), value);
} }
ceValueBuilder.withValue(CloudEventsMaker.FieldName.DATA, serializedData); ceValueBuilder.withValue(CloudEventsMaker.FieldName.DATA, serializedData);
@ -413,11 +420,27 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven
return new SchemaAndValue(ceSchema, ceValueBuilder.build()); return new SchemaAndValue(ceSchema, ceValueBuilder.build());
} }
private void ceValueFromStruct(Struct struct, Schema schema, CEValueBuilder ceValueBuilder, Function<String, String> nameMapper) {
for (Field field : schema.fields()) {
Object value = struct.get(field);
if (field.schema().type() == Type.INT64 && value != null) {
value = String.valueOf((long) value);
}
ceValueBuilder.withValue(nameMapper.apply(field.name()), value);
}
}
private void ceSchemaFromSchema(Schema schema, CESchemaBuilder ceSchemaBuilder, Function<String, String> nameMapper, boolean alwaysOptional) {
for (Field field : schema.fields()) {
ceSchemaBuilder.withSchema(nameMapper.apply(field.name()), convertToCeExtensionSchema(field.schema(), alwaysOptional));
}
}
/** /**
* Converts the given source attribute schema into a corresponding CE extension schema. * Converts the given source attribute schema into a corresponding CE extension schema.
* The types supported there are limited, e.g. int64 can only be represented as string. * The types supported there are limited, e.g. int64 can only be represented as string.
*/ */
private Schema convertToCeExtensionSchema(Schema schema) { private Schema convertToCeExtensionSchema(Schema schema, boolean alwaysOptional) {
SchemaBuilder ceExtensionSchema; SchemaBuilder ceExtensionSchema;
if (schema.type() == Type.BOOLEAN) { if (schema.type() == Type.BOOLEAN) {
@ -437,13 +460,17 @@ else if (schema.type() == Type.STRING || schema.type() == Type.INT64) {
throw new IllegalArgumentException("Source field of type " + schema.type() + " cannot be converted into CloudEvents extension attribute."); throw new IllegalArgumentException("Source field of type " + schema.type() + " cannot be converted into CloudEvents extension attribute.");
} }
if (schema.isOptional()) { if (alwaysOptional || schema.isOptional()) {
ceExtensionSchema.optional(); ceExtensionSchema.optional();
} }
return ceExtensionSchema.build(); return ceExtensionSchema.build();
} }
private Schema convertToCeExtensionSchema(Schema schema) {
return convertToCeExtensionSchema(schema, false);
}
private static CESchemaBuilder defineSchema() { private static CESchemaBuilder defineSchema() {
return new CESchemaBuilder() { return new CESchemaBuilder() {
private final SchemaBuilder builder = SchemaBuilder.struct(); private final SchemaBuilder builder = SchemaBuilder.struct();
@ -540,6 +567,10 @@ static String adjustExtensionName(String original) {
return sb.toString(); return sb.toString();
} }
private static String txExtensionName(String name) {
return adjustExtensionName(TX_ATTRIBUTE_PREFIX + name);
}
private static boolean isValidExtensionNameCharacter(char c) { private static boolean isValidExtensionNameCharacter(char c) {
return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9'); return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9');
} }

View File

@ -28,6 +28,7 @@ public abstract class RecordParser {
private final Struct record; private final Struct record;
private final Struct source; private final Struct source;
private final Struct transaction;
private final String op; private final String op;
private final Schema opSchema; private final Schema opSchema;
private final String ts_ms; private final String ts_ms;
@ -71,6 +72,7 @@ public static RecordParser create(Schema schema, Object value) {
protected RecordParser(Schema schema, Struct record, String... dataFields) { protected RecordParser(Schema schema, Struct record, String... dataFields) {
this.record = record; this.record = record;
this.source = record.getStruct(Envelope.FieldName.SOURCE); this.source = record.getStruct(Envelope.FieldName.SOURCE);
this.transaction = record.schema().field(Envelope.FieldName.TRANSACTION) != null ? record.getStruct(Envelope.FieldName.TRANSACTION) : null;
this.op = record.getString(Envelope.FieldName.OPERATION); this.op = record.getString(Envelope.FieldName.OPERATION);
this.opSchema = schema.field(Envelope.FieldName.OPERATION).schema(); this.opSchema = schema.field(Envelope.FieldName.OPERATION).schema();
this.ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString(); this.ts_ms = record.getInt64(Envelope.FieldName.TIMESTAMP).toString();
@ -111,6 +113,15 @@ public Struct source() {
return source; return source;
} }
/**
* Get the value of the transaction field in the record.
*
* @return the value of the transaction field
*/
public Struct transaction() {
return transaction;
}
/** /**
* Get the value of the op field in the record. * Get the value of the op field in the record.
* *

View File

@ -31,7 +31,7 @@
public class CloudEventsConverterTest { public class CloudEventsConverterTest {
public static void shouldConvertToCloudEventsInJson(SourceRecord record) { public static void shouldConvertToCloudEventsInJson(SourceRecord record, boolean hasTransaction) {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("serializer.type", "json"); config.put("serializer.type", "json");
config.put("data.serializer.type", "json"); config.put("data.serializer.type", "json");
@ -92,6 +92,12 @@ public static void shouldConvertToCloudEventsInJson(SourceRecord record) {
msg = "inspecting required CloudEvents extension attributes for Debezium"; msg = "inspecting required CloudEvents extension attributes for Debezium";
assertThat(valueJson.get("iodebeziumop")).isNotNull(); assertThat(valueJson.get("iodebeziumop")).isNotNull();
assertThat(valueJson.get("iodebeziumtsms")).isNotNull(); assertThat(valueJson.get("iodebeziumtsms")).isNotNull();
if (hasTransaction) {
msg = "inspecting transaction metadata attributes";
assertThat(valueJson.get("iodebeziumtxid")).isNotNull();
assertThat(valueJson.get("iodebeziumtxtotalorder")).isNotNull();
assertThat(valueJson.get("iodebeziumtxdatacollectionorder")).isNotNull();
}
msg = "inspecting the data field in the value"; msg = "inspecting the data field in the value";
dataJson = valueJson.get(CloudEventsMaker.FieldName.DATA); dataJson = valueJson.get(CloudEventsMaker.FieldName.DATA);
assertThat(dataJson.get(CloudEventsMaker.FieldName.SCHEMA_FIELD_NAME)).isNotNull(); assertThat(dataJson.get(CloudEventsMaker.FieldName.SCHEMA_FIELD_NAME)).isNotNull();
@ -113,7 +119,7 @@ public static void shouldConvertToCloudEventsInJson(SourceRecord record) {
} }
} }
public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record) { public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord record, boolean hasTransaction) {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("serializer.type", "json"); config.put("serializer.type", "json");
config.put("data.serializer.type", "avro"); config.put("data.serializer.type", "avro");
@ -177,6 +183,12 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r
msg = "inspecting required CloudEvents extension attributes for Debezium"; msg = "inspecting required CloudEvents extension attributes for Debezium";
assertThat(valueJson.get("iodebeziumop")).isNotNull(); assertThat(valueJson.get("iodebeziumop")).isNotNull();
assertThat(valueJson.get("iodebeziumtsms")).isNotNull(); assertThat(valueJson.get("iodebeziumtsms")).isNotNull();
if (hasTransaction) {
msg = "inspecting transaction metadata attributes";
assertThat(valueJson.get("iodebeziumtxid")).isNotNull();
assertThat(valueJson.get("iodebeziumtxtotalorder")).isNotNull();
assertThat(valueJson.get("iodebeziumtxdatacollectionorder")).isNotNull();
}
msg = "inspecting the data field in the value"; msg = "inspecting the data field in the value";
dataJson = valueJson.get(CloudEventsMaker.FieldName.DATA); dataJson = valueJson.get(CloudEventsMaker.FieldName.DATA);
assertThat(dataJson).isNotNull(); assertThat(dataJson).isNotNull();
@ -200,7 +212,7 @@ public static void shouldConvertToCloudEventsInJsonWithDataAsAvro(SourceRecord r
} }
} }
public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String connectorName, String serverName) { public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String connectorName, String serverName, boolean hasTransaction) {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("serializer.type", "avro"); config.put("serializer.type", "avro");
config.put("data.serializer.type", "avro"); config.put("data.serializer.type", "avro");
@ -260,6 +272,12 @@ public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String
msg = "inspecting required CloudEvents extension attributes in the value"; msg = "inspecting required CloudEvents extension attributes in the value";
assertThat(avroValue.get(CloudEventsConverter.adjustExtensionName(Envelope.FieldName.OPERATION))).isNotNull(); assertThat(avroValue.get(CloudEventsConverter.adjustExtensionName(Envelope.FieldName.OPERATION))).isNotNull();
assertThat(avroValue.get(CloudEventsConverter.adjustExtensionName(Envelope.FieldName.TIMESTAMP))).isNotNull(); assertThat(avroValue.get(CloudEventsConverter.adjustExtensionName(Envelope.FieldName.TIMESTAMP))).isNotNull();
if (hasTransaction) {
msg = "inspecting transaction metadata attributes";
assertThat(avroValue.get("iodebeziumtxid")).isNotNull();
assertThat(avroValue.get("iodebeziumtxtotalorder")).isNotNull();
assertThat(avroValue.get("iodebeziumtxdatacollectionorder")).isNotNull();
}
msg = "inspecting the data field in the value"; msg = "inspecting the data field in the value";
Struct avroDataField = avroValue.getStruct(CloudEventsMaker.FieldName.DATA); Struct avroDataField = avroValue.getStruct(CloudEventsMaker.FieldName.DATA);
// before field may be null // before field may be null