DBZ-191 Adding record validations

This commit is contained in:
Gunnar Morling 2018-06-14 15:18:50 +02:00 committed by Jiri Pechanec
parent 84dd36df46
commit 9eb4b90ec9
3 changed files with 156 additions and 16 deletions

View File

@ -21,10 +21,6 @@
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
@ -34,6 +30,10 @@
import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
/**
@ -75,6 +75,8 @@ public void unsignedTinyIntTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_TINYINT_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -105,6 +107,8 @@ public void unsignedSmallIntTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_SMALLINT_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -135,6 +139,8 @@ public void unsignedMediumIntTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_MEDIUMINT_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -165,6 +171,8 @@ public void unsignedIntTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_INT_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -195,6 +203,8 @@ public void unsignedBigIntToLongTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_BIGINT_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -226,6 +236,11 @@ public void unsignedBigIntToBigDecimalTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_BIGINT_TABLE")).get(0);
// TODO can't validate due to https://github.com/confluentinc/schema-registry/issues/833
// enable once that's resolved upstream
// validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -256,6 +271,8 @@ public void stringTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("STRING_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -285,6 +302,8 @@ public void unsignedBitTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("BIT_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -318,6 +337,8 @@ public void booleanTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("BOOLEAN_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -341,6 +362,8 @@ public void numberTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("NUMBER_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -364,6 +387,8 @@ public void floatAndDoubleTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("FlOAT_DOUBLE_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
assertThat(schemaA.defaultValue()).isEqualTo(0d);
@ -381,6 +406,8 @@ public void realTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("REAL_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
assertThat(schemaA.defaultValue()).isEqualTo(1d);
@ -399,6 +426,8 @@ public void numericAndDecimalToDoubleTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("NUMERIC_DECIMAL_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -419,6 +448,11 @@ public void numericAndDecimalToDecimalTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(EVENT_COUNT);
SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("NUMERIC_DECIMAL_TABLE")).get(0);
// TODO can't validate due to https://github.com/confluentinc/schema-registry/issues/833
// enable once that's resolved upstream
// validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
assertThat(schemaA.defaultValue()).isEqualTo(BigDecimal.valueOf(1.23));
@ -437,6 +471,8 @@ public void dateAndTimeTest() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(7);
final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATE_TIME_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -492,6 +528,8 @@ public void timeTypeWithAdaptiveMode() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(7);
final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATE_TIME_TABLE")).get(0);
validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();
@ -540,6 +578,11 @@ public void timeTypeWithConnectMode() throws InterruptedException {
SourceRecords records = consumeRecordsByTopic(7);
final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATE_TIME_TABLE")).get(0);
// TODO can't validate due to https://github.com/confluentinc/schema-registry/issues/833
// enable once that's resolved upstream
// validate(record);
Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema();
Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema();
Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema();

View File

@ -51,6 +51,7 @@
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.data.Xml;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
@ -547,12 +548,12 @@ protected static TableId tableIdFromInsertStmt(String statement) {
}
protected static class SchemaAndValueField {
private final Object schema;
private final Schema schema;
private final Object value;
private final String fieldName;
private Supplier<Boolean> assertValueOnlyIf = null;
public SchemaAndValueField(String fieldName, Object schema, Object value) {
public SchemaAndValueField(String fieldName, Schema schema, Object value) {
this.schema = schema;
this.value = value;
this.fieldName = fieldName;
@ -632,7 +633,7 @@ private void assertSchema(Struct content) {
Schema schema = content.schema();
Field field = schema.field(fieldName);
assertNotNull(fieldName + " not found in schema " + schema, field);
assertEquals("Schema for " + field.name() + " does not match the actual value", this.schema, field.schema());
VerifyRecord.assertConnectSchemasAreEqual(this.schema, field.schema());
}
}

View File

@ -18,10 +18,12 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
@ -428,6 +430,16 @@ public static void assertEquals(SourceRecord actual, SourceRecord expected, Pred
assertEquals(actualValueSchema, actual.value(), expected.value(), "value", "", ignoreFields, comparatorsByName, comparatorsBySchemaName);
}
public static void assertConnectSchemasAreEqual(Schema schema1, Schema schema2) {
if (!areConnectSchemasEqual(schema1, schema2)) {
// failing with an assertion message that shows the difference
assertThat(SchemaUtil.asString(schema1)).isEqualTo(SchemaUtil.asString(schema2));
// fall-back just in case
fail(SchemaUtil.asString(schema1) + " was not equal to " + SchemaUtil.asString(schema2));
}
}
protected static String nameOf(String keyOrValue, String field) {
if (field == null || field.trim().isEmpty()) {
return keyOrValue;
@ -814,7 +826,11 @@ protected static String prettyJson(JsonNode json) {
protected static void assertEquals(Object o1, Object o2) {
// assertThat(o1).isEqualTo(o2);
if (!equals(o1, o2)) {
if (o1 instanceof Schema && o2 instanceof Schema) {
assertConnectSchemasAreEqual((Schema) o1, (Schema) o2);
}
else if (!equals(o1, o2)) {
fail(SchemaUtil.asString(o1) + " was not equal to " + SchemaUtil.asString(o2));
}
}
@ -823,7 +839,9 @@ protected static void assertEquals(Object o1, Object o2) {
protected static boolean equals(Object o1, Object o2) {
if (o1 == o2) return true;
if (o1 == null) return o2 == null ? true : false;
if (o2 == null) return false;
if (o2 == null) {
return false;
}
if (o1 instanceof ByteBuffer) {
o1 = ((ByteBuffer) o1).array();
}
@ -841,22 +859,31 @@ protected static boolean equals(Object o1, Object o2) {
if (o1 instanceof Map && o2 instanceof Map) {
Map<String, Object> m1 = (Map<String, Object>) o1;
Map<String, Object> m2 = (Map<String, Object>) o2;
if (!m1.keySet().equals(m2.keySet())) return false;
if (!m1.keySet().equals(m2.keySet())) {
return false;
}
for (Map.Entry<String, Object> entry : m1.entrySet()) {
Object v1 = entry.getValue();
Object v2 = m2.get(entry.getKey());
if (!equals(v1, v2)) return false;
if (!equals(v1, v2)) {
return false;
}
}
return true;
}
if (o1 instanceof Collection && o2 instanceof Collection) {
Collection<Object> m1 = (Collection<Object>) o1;
Collection<Object> m2 = (Collection<Object>) o2;
if (m1.size() != m2.size()) return false;
if (m1.size() != m2.size()) {
return false;
}
Iterator<?> iter1 = m1.iterator();
Iterator<?> iter2 = m2.iterator();
while (iter1.hasNext() && iter2.hasNext()) {
if (!equals(iter1.next(), iter2.next())) return false;
if (!equals(iter1.next(), iter2.next())) {
return false;
}
}
return true;
}
@ -867,14 +894,19 @@ protected static boolean equals(Object o1, Object o2) {
// does not work for non-primitive values.
Struct struct1 = (Struct) o1;
Struct struct2 = (Struct) o2;
if (!Objects.equals(struct1.schema(), struct2.schema())) {
if (!areConnectSchemasEqual(struct1.schema(), struct2.schema())) {
return false;
}
Object[] array1 = valuesFor(struct1);
Object[] array2 = valuesFor(struct2);
boolean result = deepEquals(array1, array2);
return result;
return deepEquals(array1, array2);
}
if (o1 instanceof ConnectSchema && o1 instanceof ConnectSchema) {
return areConnectSchemasEqual((ConnectSchema)o1, (ConnectSchema)o2);
}
return Objects.equals(o1, o2);
}
@ -940,4 +972,68 @@ else if (e1 instanceof boolean[] && e2 instanceof boolean[])
eq = equals(e1, e2);
return eq;
}
private static boolean areConnectSchemasEqual(Schema schema1, Schema schema2) {
if (schema1 == schema2) {
return true;
}
if (schema1 == null && schema2 != null || schema1 != null && schema2 == null) {
return false;
}
if (schema1.getClass() != schema2.getClass()) {
return false;
}
boolean keySchemasEqual = true;
boolean valueSchemasEqual = true;
boolean fieldsEqual = true;
if (schema1.type() == Type.MAP && schema2.type() == Type.MAP) {
keySchemasEqual = Objects.equals(schema1.keySchema(), schema2.keySchema());
valueSchemasEqual = Objects.equals(schema1.valueSchema(), schema2.valueSchema());
}
else if (schema1.type() == Type.ARRAY && schema2.type() == Type.ARRAY) {
valueSchemasEqual = Objects.equals(schema1.valueSchema(), schema2.valueSchema());
}
else if (schema1.type() == Type.STRUCT && schema2.type() == Type.STRUCT) {
fieldsEqual = areFieldListsEqual(schema1.fields(), schema2.fields());
}
boolean equal = Objects.equals(schema1.isOptional(), schema2.isOptional()) &&
Objects.equals(schema1.version(), schema2.version()) &&
Objects.equals(schema1.name(), schema2.name()) &&
Objects.equals(schema1.doc(), schema2.doc()) &&
Objects.equals(schema1.type(), schema2.type()) &&
Objects.deepEquals(schema1.defaultValue(), schema2.defaultValue()) &&
fieldsEqual &&
keySchemasEqual &&
valueSchemasEqual &&
Objects.equals(schema1.parameters(), schema2.parameters());
return equal;
}
private static boolean areFieldListsEqual(List<Field> fields1, List<Field> fields2) {
if (fields1 == null && fields2 != null || fields1 != null && fields2 == null) {
return false;
}
if (fields1.size() != fields2.size()) {
return false;
}
for(int i = 0; i < fields1.size(); i++) {
Field field1 = fields1.get(i);
Field field2 = fields2.get(i);
boolean equal = Objects.equals(field1.index(), field2.index()) &&
Objects.equals(field1.name(), field2.name()) &&
areConnectSchemasEqual(field1.schema(), field2.schema());
if (!equal) {
return false;
}
}
return true;
}
}