From b7fede7d53bda04a779d29e80e9ccaf499674619 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 25 Nov 2020 10:10:34 +0100 Subject: [PATCH] DBZ-735 Verify Avro serialization --- .../connector/mysql/MySqlDecimalIT.java | 1 + .../connector/mysql/MysqlDefaultValueIT.java | 1 + .../java/io/debezium/data/VerifyRecord.java | 82 +++++++++++++++++-- .../embedded/AbstractConnectorTest.java | 11 ++- 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDecimalIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDecimalIT.java index 8a06018c8..ab344a90c 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDecimalIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDecimalIT.java @@ -49,6 +49,7 @@ public void beforeEach() { DATABASE.createAndInitialize(); initializeConnectorTestFramework(); Testing.Files.delete(DB_HISTORY_PATH); + skipAvroValidation(); // https://github.com/confluentinc/schema-registry/issues/1693 } @After diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java index 3246d1b49..780352696 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java @@ -69,6 +69,7 @@ public void beforeEach() { DATABASE.createAndInitialize(); initializeConnectorTestFramework(); Testing.Files.delete(DB_HISTORY_PATH); + skipAvroValidation(); // https://github.com/confluentinc/schema-registry/issues/1693 } @After diff --git a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java index 86f6bfe10..7797f27b7 100644 --- a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java +++ b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.json.JsonConverter; @@ -700,6 +701,16 @@ else if (ZonedTimestamp.SCHEMA_NAME.equals(schemaName)) { * @param record the record to validate; may not be null */ public static void isValid(SourceRecord record) { + } + + /** + * Validate that a {@link SourceRecord}'s key and value can each be converted to a byte[] and then back to an equivalent + * {@link SourceRecord}. + * + * @param record the record to validate; may not be null + * @param ignoreAvro true when Avro check should not be executed + */ + public static void isValid(SourceRecord record, boolean ignoreAvro) { // print(record); JsonNode keyJson = null; @@ -766,17 +777,21 @@ public static void isValid(SourceRecord record) { validateSchemaNames(record.keySchema()); validateSchemaNames(record.valueSchema()); + // Introduced due to https://github.com/confluentinc/schema-registry/issues/1693 + if (ignoreAvro) { + return; + } // Serialize and deserialize the key using the Avro converter, and check that we got the same result ... msg = "serializing key using Avro converter"; byte[] avroKeyBytes = avroValueConverter.fromConnectData(record.topic(), record.keySchema(), record.key()); msg = "deserializing key using Avro converter"; avroKeyWithSchema = avroValueConverter.toConnectData(record.topic(), avroKeyBytes); msg = "comparing key schema to that serialized/deserialized with Avro converter"; - assertEquals(keyWithSchema.schema(), record.keySchema()); + assertEquals(setVersion(avroKeyWithSchema.schema(), null), record.keySchema()); msg = "comparing key to that serialized/deserialized with Avro converter"; - assertEquals(keyWithSchema.value(), record.key()); + assertEquals(setVersion(avroKeyWithSchema, null).value(), record.key()); msg = "comparing key to its schema"; - schemaMatchesStruct(keyWithSchema); + schemaMatchesStruct(avroKeyWithSchema); // Serialize and deserialize the value using the Avro converter, and check that we got the same result ... msg = "serializing value using Avro converter"; @@ -784,11 +799,11 @@ public static void isValid(SourceRecord record) { msg = "deserializing value using Avro converter"; avroValueWithSchema = avroValueConverter.toConnectData(record.topic(), avroValueBytes); msg = "comparing value schema to that serialized/deserialized with Avro converter"; - assertEquals(valueWithSchema.schema(), record.valueSchema()); + assertEquals(setVersion(avroValueWithSchema.schema(), null), record.valueSchema()); msg = "comparing value to that serialized/deserialized with Avro converter"; - assertEquals(valueWithSchema.value(), record.value()); + assertEquals(setVersion(avroValueWithSchema, null).value(), record.value()); msg = "comparing value to its schema"; - schemaMatchesStruct(valueWithSchema); + schemaMatchesStruct(avroValueWithSchema); } catch (Throwable t) { @@ -1134,4 +1149,59 @@ private static boolean areFieldListsEqual(List fields1, List field return true; } + + /** + * Sets the version of a passed schema to a new value. + * + * @param schema the schema to be updated + * @param version the target version value + * @return the new schema with the same structure but updated version + */ + private static Schema setVersion(Schema schema, Integer version) { + if (schema == null) { + return null; + } + final SchemaBuilder builder = new SchemaBuilder(schema.type()) + .name(schema.name()) + .version(version) + .doc(schema.doc()); + if (schema.defaultValue() != null) { + builder.defaultValue(schema.defaultValue()); + } + if (schema.isOptional()) { + builder.optional(); + } + if (schema.parameters() != null) { + builder.parameters(schema.parameters()); + } + if (schema.fields() != null) { + for (Field f : schema.fields()) { + builder.field(f.name(), f.schema()); + } + } + return builder.build(); + } + + /** + * Sets the version of a passed schema to a new value. + * + * @param value the value with schema to be updated + * @param version the target version value + * @return the new value with the same schema but updated version + */ + private static SchemaAndValue setVersion(SchemaAndValue value, Integer version) { + final Schema schema = setVersion(value.schema(), version); + if (schema == null) { + return value; + } + if (schema.type() != Type.STRUCT) { + return new SchemaAndValue(schema, value); + } + final Struct struct = new Struct(schema); + final Struct old = (Struct) value.value(); + for (Field f : schema.fields()) { + struct.put(f, old.getWithoutDefault(f.name())); + } + return new SchemaAndValue(schema, struct); + } } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index dcfc89a23..675dacf8f 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -105,6 +105,7 @@ public abstract class AbstractConnectorTest implements Testing { private JsonConverter valueJsonConverter = new JsonConverter(); private JsonDeserializer keyJsonDeserializer = new JsonDeserializer(); private JsonDeserializer valueJsonDeserializer = new JsonDeserializer(); + private boolean skipAvroValidation = false; @Rule public TestRule logTestName = new TestLogger(logger); @@ -438,7 +439,7 @@ else if (Testing.Print.isEnabled()) { print(record); } if (assertRecords) { - VerifyRecord.isValid(record); + VerifyRecord.isValid(record, skipAvroValidation); } } else { @@ -769,6 +770,14 @@ protected boolean waitForAvailableRecords(long timeout, TimeUnit unit) { return !consumedLines.isEmpty(); } + /** + * Disable record validation using Avro converter. + * Introduced to workaround https://github.com/confluentinc/schema-registry/issues/1693 + */ + protected void skipAvroValidation() { + skipAvroValidation = true; + } + /** * Assert that the connector is currently running. */