DBZ-5044 Ignore versions for schema comparison

This commit is contained in:
Jiri Pechanec 2022-07-01 14:05:03 +02:00
parent fa2a7ba60c
commit b8bcac1aef

View File

@ -806,9 +806,9 @@ public static void isValid(SourceRecord record, boolean ignoreAvro) {
msg = "deserializing key using Avro converter"; msg = "deserializing key using Avro converter";
avroKeyWithSchema = avroValueConverter.toConnectData(record.topic(), avroKeyBytes); avroKeyWithSchema = avroValueConverter.toConnectData(record.topic(), avroKeyBytes);
msg = "comparing key schema to that serialized/deserialized with Avro converter"; msg = "comparing key schema to that serialized/deserialized with Avro converter";
assertEquals(setVersion(avroKeyWithSchema.schema(), null), record.keySchema()); assertEquals(setVersion(avroKeyWithSchema.schema(), null), setVersion(record.keySchema(), null));
msg = "comparing key to that serialized/deserialized with Avro converter"; msg = "comparing key to that serialized/deserialized with Avro converter";
assertEquals(setVersion(avroKeyWithSchema, null).value(), record.key()); assertEquals(setVersion(avroKeyWithSchema, null).value(), setVersion(record.key(), null));
msg = "comparing key to its schema"; msg = "comparing key to its schema";
schemaMatchesStruct(avroKeyWithSchema); schemaMatchesStruct(avroKeyWithSchema);
@ -818,9 +818,9 @@ public static void isValid(SourceRecord record, boolean ignoreAvro) {
msg = "deserializing value using Avro converter"; msg = "deserializing value using Avro converter";
avroValueWithSchema = avroValueConverter.toConnectData(record.topic(), avroValueBytes); avroValueWithSchema = avroValueConverter.toConnectData(record.topic(), avroValueBytes);
msg = "comparing value schema to that serialized/deserialized with Avro converter"; msg = "comparing value schema to that serialized/deserialized with Avro converter";
assertEquals(setVersion(avroValueWithSchema.schema(), null), record.valueSchema()); assertEquals(setVersion(avroValueWithSchema.schema(), null), setVersion(record.valueSchema(), null));
msg = "comparing value to that serialized/deserialized with Avro converter"; msg = "comparing value to that serialized/deserialized with Avro converter";
assertEquals(setVersion(avroValueWithSchema, null).value(), record.value()); assertEquals(setVersion(avroValueWithSchema, null).value(), setVersion(record.value(), null));
msg = "comparing value to its schema"; msg = "comparing value to its schema";
schemaMatchesStruct(avroValueWithSchema); schemaMatchesStruct(avroValueWithSchema);
@ -1236,4 +1236,31 @@ private static SchemaAndValue setVersion(SchemaAndValue value, Integer version)
} }
return new SchemaAndValue(schema, struct); return new SchemaAndValue(schema, struct);
} }
/**
* Sets the version of a passed schema to a new value.
*
* @param object the value with schema to be updated
* @param version the target version value
* @return the new value with the same schema but updated version
*/
private static Object setVersion(Object obj, Integer version) {
if (!(obj instanceof Struct)) {
return obj;
}
final Struct value = (Struct) obj;
final Schema schema = setVersion(value.schema(), version);
if (schema == null) {
return value;
}
if (schema.type() != Type.STRUCT) {
return value;
}
final Struct struct = new Struct(schema);
final Struct old = value;
for (Field f : schema.fields()) {
struct.put(f, old.getWithoutDefault(f.name()));
}
return struct;
}
} }