From 6474c68e420415e810232b9a4ca0ec272ab0f1cf Mon Sep 17 00:00:00 2001 From: Roman Kudryashov Date: Sat, 25 Nov 2023 07:10:04 +0300 Subject: [PATCH] DBZ-7159 Simplify CE validator API by setting serializer type at the configuration phase --- .../converters/CloudEventsConverter.java | 6 ++++-- .../converters/spi/CloudEventsValidator.java | 18 ++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java index c092dd654..6ba1847e9 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java @@ -234,6 +234,8 @@ else if (avroConfig.hasKey(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG)) { avroConverter.configure(avroConfig.asMap(), false); } } + + cloudEventsValidator.configure(ceSerializerType); } protected Map configureConverterType(boolean isKey, Map config) { @@ -377,7 +379,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) { // The conversion back thus must be schemaless. // If data are in schema/payload envelope they are extracted final SchemaAndValue connectData = jsonCloudEventsConverter.toConnectData(topic, value); - cloudEventsValidator.verifyIsCloudEvent(connectData, ceSerializerType); + cloudEventsValidator.verifyIsCloudEvent(connectData); final JsonNode jsonValue = jsonDeserializer.deserialize(topic, value); SchemaAndValue dataField = reconvertData(topic, jsonValue.get(CloudEventsMaker.FieldName.DATA), dataSerializerType, enableJsonSchemas); @@ -392,7 +394,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) { // First reconvert the whole CloudEvents // Then reconvert the "data" field SchemaAndValue ceSchemaAndValue = avroConverter.toConnectData(topic, value); - cloudEventsValidator.verifyIsCloudEvent(ceSchemaAndValue, ceSerializerType); + cloudEventsValidator.verifyIsCloudEvent(ceSchemaAndValue); Schema incompleteSchema = ceSchemaAndValue.schema(); Struct ceValue = (Struct) ceSchemaAndValue.value(); byte[] data = ceValue.getBytes(CloudEventsMaker.FieldName.DATA); diff --git a/debezium-core/src/main/java/io/debezium/converters/spi/CloudEventsValidator.java b/debezium-core/src/main/java/io/debezium/converters/spi/CloudEventsValidator.java index f02acdf72..35f072e96 100644 --- a/debezium-core/src/main/java/io/debezium/converters/spi/CloudEventsValidator.java +++ b/debezium-core/src/main/java/io/debezium/converters/spi/CloudEventsValidator.java @@ -29,17 +29,23 @@ public class CloudEventsValidator { CloudEventsMaker.FieldName.SPECVERSION, CloudEventsMaker.FieldName.TYPE); - public boolean isCloudEvent(SchemaAndValue schemaAndValue, SerializerType serializerType) { - return baseCheck(schemaAndValue, serializerType) && checkFields(schemaAndValue.value(), serializerType); + private SerializerType serializerType; + + public void configure(SerializerType serializerType) { + this.serializerType = serializerType; } - public void verifyIsCloudEvent(SchemaAndValue schemaAndValue, SerializerType serializerType) { - if (!isCloudEvent(schemaAndValue, serializerType)) { + public boolean isCloudEvent(SchemaAndValue schemaAndValue) { + return baseCheck(schemaAndValue) && checkFields(schemaAndValue.value()); + } + + public void verifyIsCloudEvent(SchemaAndValue schemaAndValue) { + if (!isCloudEvent(schemaAndValue)) { throw new DataException("A deserialized record's value is not a CloudEvent: value=" + schemaAndValue.value()); } } - private boolean baseCheck(SchemaAndValue schemaAndValue, SerializerType serializerType) { + private boolean baseCheck(SchemaAndValue schemaAndValue) { switch (serializerType) { case JSON: return schemaAndValue.schema() == null && schemaAndValue.value() instanceof Map; @@ -50,7 +56,7 @@ private boolean baseCheck(SchemaAndValue schemaAndValue, SerializerType serializ } } - private boolean checkFields(Object value, SerializerType serializerType) { + private boolean checkFields(Object value) { final List fieldNames; switch (serializerType) { case JSON: