diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java index 8d38db986..117ab663c 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java @@ -21,10 +21,6 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; -import io.debezium.jdbc.TemporalPrecisionMode; -import io.debezium.time.MicroTimestamp; -import io.debezium.time.Timestamp; -import io.debezium.time.ZonedTimestamp; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; @@ -34,6 +30,10 @@ import io.debezium.config.Configuration; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.Timestamp; +import io.debezium.time.ZonedTimestamp; import io.debezium.util.Testing; /** @@ -75,6 +75,8 @@ public void unsignedTinyIntTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_TINYINT_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -105,6 +107,8 @@ public void unsignedSmallIntTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_SMALLINT_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -135,6 +139,8 @@ public void unsignedMediumIntTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_MEDIUMINT_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -165,6 +171,8 @@ public void unsignedIntTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_INT_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -195,6 +203,8 @@ public void unsignedBigIntToLongTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_BIGINT_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -226,6 +236,11 @@ public void unsignedBigIntToBigDecimalTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("UNSIGNED_BIGINT_TABLE")).get(0); + + // TODO can't validate due to https://github.com/confluentinc/schema-registry/issues/833 + // enable once that's resolved upstream + // validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -256,6 +271,8 @@ public void stringTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("STRING_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -285,6 +302,8 @@ public void unsignedBitTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("BIT_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -318,6 +337,8 @@ public void booleanTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("BOOLEAN_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -341,6 +362,8 @@ public void numberTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("NUMBER_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -364,6 +387,8 @@ public void floatAndDoubleTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("FlOAT_DOUBLE_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); assertThat(schemaA.defaultValue()).isEqualTo(0d); @@ -381,6 +406,8 @@ public void realTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("REAL_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); assertThat(schemaA.defaultValue()).isEqualTo(1d); @@ -399,6 +426,8 @@ public void numericAndDecimalToDoubleTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("NUMERIC_DECIMAL_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -419,6 +448,11 @@ public void numericAndDecimalToDecimalTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(EVENT_COUNT); SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("NUMERIC_DECIMAL_TABLE")).get(0); + + // TODO can't validate due to https://github.com/confluentinc/schema-registry/issues/833 + // enable once that's resolved upstream + // validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); assertThat(schemaA.defaultValue()).isEqualTo(BigDecimal.valueOf(1.23)); @@ -437,6 +471,8 @@ public void dateAndTimeTest() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(7); final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATE_TIME_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -492,6 +528,8 @@ public void timeTypeWithAdaptiveMode() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(7); final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATE_TIME_TABLE")).get(0); + validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); @@ -540,6 +578,11 @@ public void timeTypeWithConnectMode() throws InterruptedException { SourceRecords records = consumeRecordsByTopic(7); final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATE_TIME_TABLE")).get(0); + + // TODO can't validate due to https://github.com/confluentinc/schema-registry/issues/833 + // enable once that's resolved upstream + // validate(record); + Schema schemaA = record.valueSchema().fields().get(1).schema().fields().get(0).schema(); Schema schemaB = record.valueSchema().fields().get(1).schema().fields().get(1).schema(); Schema schemaC = record.valueSchema().fields().get(1).schema().fields().get(2).schema(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index e48ae9206..5a558d02f 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -51,6 +51,7 @@ import io.debezium.data.Json; import io.debezium.data.Uuid; import io.debezium.data.VariableScaleDecimal; +import io.debezium.data.VerifyRecord; import io.debezium.data.Xml; import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; @@ -547,12 +548,12 @@ protected static TableId tableIdFromInsertStmt(String statement) { } protected static class SchemaAndValueField { - private final Object schema; + private final Schema schema; private final Object value; private final String fieldName; private Supplier assertValueOnlyIf = null; - public SchemaAndValueField(String fieldName, Object schema, Object value) { + public SchemaAndValueField(String fieldName, Schema schema, Object value) { this.schema = schema; this.value = value; this.fieldName = fieldName; @@ -632,7 +633,7 @@ private void assertSchema(Struct content) { Schema schema = content.schema(); Field field = schema.field(fieldName); assertNotNull(fieldName + " not found in schema " + schema, field); - assertEquals("Schema for " + field.name() + " does not match the actual value", this.schema, field.schema()); + VerifyRecord.assertConnectSchemasAreEqual(this.schema, field.schema()); } } diff --git a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java index 81da1b8ec..19f134fd1 100644 --- a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java +++ b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java @@ -18,10 +18,12 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Predicate; +import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; @@ -428,6 +430,16 @@ public static void assertEquals(SourceRecord actual, SourceRecord expected, Pred assertEquals(actualValueSchema, actual.value(), expected.value(), "value", "", ignoreFields, comparatorsByName, comparatorsBySchemaName); } + public static void assertConnectSchemasAreEqual(Schema schema1, Schema schema2) { + if (!areConnectSchemasEqual(schema1, schema2)) { + // failing with an assertion message that shows the difference + assertThat(SchemaUtil.asString(schema1)).isEqualTo(SchemaUtil.asString(schema2)); + + // fall-back just in case + fail(SchemaUtil.asString(schema1) + " was not equal to " + SchemaUtil.asString(schema2)); + } + } + protected static String nameOf(String keyOrValue, String field) { if (field == null || field.trim().isEmpty()) { return keyOrValue; @@ -814,7 +826,11 @@ protected static String prettyJson(JsonNode json) { protected static void assertEquals(Object o1, Object o2) { // assertThat(o1).isEqualTo(o2); - if (!equals(o1, o2)) { + + if (o1 instanceof Schema && o2 instanceof Schema) { + assertConnectSchemasAreEqual((Schema) o1, (Schema) o2); + } + else if (!equals(o1, o2)) { fail(SchemaUtil.asString(o1) + " was not equal to " + SchemaUtil.asString(o2)); } } @@ -823,7 +839,9 @@ protected static void assertEquals(Object o1, Object o2) { protected static boolean equals(Object o1, Object o2) { if (o1 == o2) return true; if (o1 == null) return o2 == null ? true : false; - if (o2 == null) return false; + if (o2 == null) { + return false; + } if (o1 instanceof ByteBuffer) { o1 = ((ByteBuffer) o1).array(); } @@ -841,22 +859,31 @@ protected static boolean equals(Object o1, Object o2) { if (o1 instanceof Map && o2 instanceof Map) { Map m1 = (Map) o1; Map m2 = (Map) o2; - if (!m1.keySet().equals(m2.keySet())) return false; + if (!m1.keySet().equals(m2.keySet())) { + return false; + } + for (Map.Entry entry : m1.entrySet()) { Object v1 = entry.getValue(); Object v2 = m2.get(entry.getKey()); - if (!equals(v1, v2)) return false; + if (!equals(v1, v2)) { + return false; + } } return true; } if (o1 instanceof Collection && o2 instanceof Collection) { Collection m1 = (Collection) o1; Collection m2 = (Collection) o2; - if (m1.size() != m2.size()) return false; + if (m1.size() != m2.size()) { + return false; + } Iterator iter1 = m1.iterator(); Iterator iter2 = m2.iterator(); while (iter1.hasNext() && iter2.hasNext()) { - if (!equals(iter1.next(), iter2.next())) return false; + if (!equals(iter1.next(), iter2.next())) { + return false; + } } return true; } @@ -867,14 +894,19 @@ protected static boolean equals(Object o1, Object o2) { // does not work for non-primitive values. Struct struct1 = (Struct) o1; Struct struct2 = (Struct) o2; - if (!Objects.equals(struct1.schema(), struct2.schema())) { + if (!areConnectSchemasEqual(struct1.schema(), struct2.schema())) { return false; } Object[] array1 = valuesFor(struct1); Object[] array2 = valuesFor(struct2); - boolean result = deepEquals(array1, array2); - return result; + + return deepEquals(array1, array2); } + + if (o1 instanceof ConnectSchema && o1 instanceof ConnectSchema) { + return areConnectSchemasEqual((ConnectSchema)o1, (ConnectSchema)o2); + } + return Objects.equals(o1, o2); } @@ -940,4 +972,68 @@ else if (e1 instanceof boolean[] && e2 instanceof boolean[]) eq = equals(e1, e2); return eq; } + + private static boolean areConnectSchemasEqual(Schema schema1, Schema schema2) { + if (schema1 == schema2) { + return true; + } + if (schema1 == null && schema2 != null || schema1 != null && schema2 == null) { + return false; + } + if (schema1.getClass() != schema2.getClass()) { + return false; + } + + boolean keySchemasEqual = true; + boolean valueSchemasEqual = true; + boolean fieldsEqual = true; + if (schema1.type() == Type.MAP && schema2.type() == Type.MAP) { + keySchemasEqual = Objects.equals(schema1.keySchema(), schema2.keySchema()); + valueSchemasEqual = Objects.equals(schema1.valueSchema(), schema2.valueSchema()); + } + else if (schema1.type() == Type.ARRAY && schema2.type() == Type.ARRAY) { + valueSchemasEqual = Objects.equals(schema1.valueSchema(), schema2.valueSchema()); + } + else if (schema1.type() == Type.STRUCT && schema2.type() == Type.STRUCT) { + fieldsEqual = areFieldListsEqual(schema1.fields(), schema2.fields()); + } + + boolean equal = Objects.equals(schema1.isOptional(), schema2.isOptional()) && + Objects.equals(schema1.version(), schema2.version()) && + Objects.equals(schema1.name(), schema2.name()) && + Objects.equals(schema1.doc(), schema2.doc()) && + Objects.equals(schema1.type(), schema2.type()) && + Objects.deepEquals(schema1.defaultValue(), schema2.defaultValue()) && + fieldsEqual && + keySchemasEqual && + valueSchemasEqual && + Objects.equals(schema1.parameters(), schema2.parameters()); + + return equal; + } + + private static boolean areFieldListsEqual(List fields1, List fields2) { + if (fields1 == null && fields2 != null || fields1 != null && fields2 == null) { + return false; + } + + if (fields1.size() != fields2.size()) { + return false; + } + + for(int i = 0; i < fields1.size(); i++) { + Field field1 = fields1.get(i); + Field field2 = fields2.get(i); + + boolean equal = Objects.equals(field1.index(), field2.index()) && + Objects.equals(field1.name(), field2.name()) && + areConnectSchemasEqual(field1.schema(), field2.schema()); + + if (!equal) { + return false; + } + } + + return true; + } }