DBZ-735 Verify Avro serialization

This commit is contained in:
Jiri Pechanec 2020-11-25 10:10:34 +01:00 committed by Gunnar Morling
parent 8c5b540346
commit b7fede7d53
4 changed files with 88 additions and 7 deletions

View File

@ -49,6 +49,7 @@ public void beforeEach() {
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
skipAvroValidation(); // https://github.com/confluentinc/schema-registry/issues/1693
}
@After

View File

@ -69,6 +69,7 @@ public void beforeEach() {
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
skipAvroValidation(); // https://github.com/confluentinc/schema-registry/issues/1693
}
@After

View File

@ -28,6 +28,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
@ -700,6 +701,16 @@ else if (ZonedTimestamp.SCHEMA_NAME.equals(schemaName)) {
* @param record the record to validate; may not be null
*/
public static void isValid(SourceRecord record) {
}
/**
* Validate that a {@link SourceRecord}'s key and value can each be converted to a byte[] and then back to an equivalent
* {@link SourceRecord}.
*
* @param record the record to validate; may not be null
* @param ignoreAvro true when Avro check should not be executed
*/
public static void isValid(SourceRecord record, boolean ignoreAvro) {
// print(record);
JsonNode keyJson = null;
@ -766,17 +777,21 @@ public static void isValid(SourceRecord record) {
validateSchemaNames(record.keySchema());
validateSchemaNames(record.valueSchema());
// Introduced due to https://github.com/confluentinc/schema-registry/issues/1693
if (ignoreAvro) {
return;
}
// Serialize and deserialize the key using the Avro converter, and check that we got the same result ...
msg = "serializing key using Avro converter";
byte[] avroKeyBytes = avroValueConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
msg = "deserializing key using Avro converter";
avroKeyWithSchema = avroValueConverter.toConnectData(record.topic(), avroKeyBytes);
msg = "comparing key schema to that serialized/deserialized with Avro converter";
assertEquals(keyWithSchema.schema(), record.keySchema());
assertEquals(setVersion(avroKeyWithSchema.schema(), null), record.keySchema());
msg = "comparing key to that serialized/deserialized with Avro converter";
assertEquals(keyWithSchema.value(), record.key());
assertEquals(setVersion(avroKeyWithSchema, null).value(), record.key());
msg = "comparing key to its schema";
schemaMatchesStruct(keyWithSchema);
schemaMatchesStruct(avroKeyWithSchema);
// Serialize and deserialize the value using the Avro converter, and check that we got the same result ...
msg = "serializing value using Avro converter";
@ -784,11 +799,11 @@ public static void isValid(SourceRecord record) {
msg = "deserializing value using Avro converter";
avroValueWithSchema = avroValueConverter.toConnectData(record.topic(), avroValueBytes);
msg = "comparing value schema to that serialized/deserialized with Avro converter";
assertEquals(valueWithSchema.schema(), record.valueSchema());
assertEquals(setVersion(avroValueWithSchema.schema(), null), record.valueSchema());
msg = "comparing value to that serialized/deserialized with Avro converter";
assertEquals(valueWithSchema.value(), record.value());
assertEquals(setVersion(avroValueWithSchema, null).value(), record.value());
msg = "comparing value to its schema";
schemaMatchesStruct(valueWithSchema);
schemaMatchesStruct(avroValueWithSchema);
}
catch (Throwable t) {
@ -1134,4 +1149,59 @@ private static boolean areFieldListsEqual(List<Field> fields1, List<Field> field
return true;
}
/**
* Sets the version of a passed schema to a new value.
*
* @param schema the schema to be updated
* @param version the target version value
* @return the new schema with the same structure but updated version
*/
private static Schema setVersion(Schema schema, Integer version) {
if (schema == null) {
return null;
}
final SchemaBuilder builder = new SchemaBuilder(schema.type())
.name(schema.name())
.version(version)
.doc(schema.doc());
if (schema.defaultValue() != null) {
builder.defaultValue(schema.defaultValue());
}
if (schema.isOptional()) {
builder.optional();
}
if (schema.parameters() != null) {
builder.parameters(schema.parameters());
}
if (schema.fields() != null) {
for (Field f : schema.fields()) {
builder.field(f.name(), f.schema());
}
}
return builder.build();
}
/**
* Sets the version of a passed schema to a new value.
*
* @param value the value with schema to be updated
* @param version the target version value
* @return the new value with the same schema but updated version
*/
private static SchemaAndValue setVersion(SchemaAndValue value, Integer version) {
final Schema schema = setVersion(value.schema(), version);
if (schema == null) {
return value;
}
if (schema.type() != Type.STRUCT) {
return new SchemaAndValue(schema, value);
}
final Struct struct = new Struct(schema);
final Struct old = (Struct) value.value();
for (Field f : schema.fields()) {
struct.put(f, old.getWithoutDefault(f.name()));
}
return new SchemaAndValue(schema, struct);
}
}

View File

@ -105,6 +105,7 @@ public abstract class AbstractConnectorTest implements Testing {
private JsonConverter valueJsonConverter = new JsonConverter();
private JsonDeserializer keyJsonDeserializer = new JsonDeserializer();
private JsonDeserializer valueJsonDeserializer = new JsonDeserializer();
private boolean skipAvroValidation = false;
@Rule
public TestRule logTestName = new TestLogger(logger);
@ -438,7 +439,7 @@ else if (Testing.Print.isEnabled()) {
print(record);
}
if (assertRecords) {
VerifyRecord.isValid(record);
VerifyRecord.isValid(record, skipAvroValidation);
}
}
else {
@ -769,6 +770,14 @@ protected boolean waitForAvailableRecords(long timeout, TimeUnit unit) {
return !consumedLines.isEmpty();
}
/**
* Disable record validation using Avro converter.
* Introduced to workaround https://github.com/confluentinc/schema-registry/issues/1693
*/
protected void skipAvroValidation() {
skipAvroValidation = true;
}
/**
* Assert that the connector is currently running.
*/