DBZ-7159 Simplify CE validator API by setting serializer type at the configuration phase

This commit is contained in:
Roman Kudryashov 2023-11-25 07:10:04 +03:00 committed by Jiri Pechanec
parent 79230c6014
commit 6474c68e42
2 changed files with 16 additions and 8 deletions

View File

@ -234,6 +234,8 @@ else if (avroConfig.hasKey(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG)) {
avroConverter.configure(avroConfig.asMap(), false);
}
}
cloudEventsValidator.configure(ceSerializerType);
}
protected Map<String, String> configureConverterType(boolean isKey, Map<String, String> config) {
@ -377,7 +379,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
// The conversion back thus must be schemaless.
// If data are in schema/payload envelope they are extracted
final SchemaAndValue connectData = jsonCloudEventsConverter.toConnectData(topic, value);
cloudEventsValidator.verifyIsCloudEvent(connectData, ceSerializerType);
cloudEventsValidator.verifyIsCloudEvent(connectData);
final JsonNode jsonValue = jsonDeserializer.deserialize(topic, value);
SchemaAndValue dataField = reconvertData(topic, jsonValue.get(CloudEventsMaker.FieldName.DATA), dataSerializerType, enableJsonSchemas);
@ -392,7 +394,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
// First reconvert the whole CloudEvents
// Then reconvert the "data" field
SchemaAndValue ceSchemaAndValue = avroConverter.toConnectData(topic, value);
cloudEventsValidator.verifyIsCloudEvent(ceSchemaAndValue, ceSerializerType);
cloudEventsValidator.verifyIsCloudEvent(ceSchemaAndValue);
Schema incompleteSchema = ceSchemaAndValue.schema();
Struct ceValue = (Struct) ceSchemaAndValue.value();
byte[] data = ceValue.getBytes(CloudEventsMaker.FieldName.DATA);

View File

@ -29,17 +29,23 @@ public class CloudEventsValidator {
CloudEventsMaker.FieldName.SPECVERSION,
CloudEventsMaker.FieldName.TYPE);
public boolean isCloudEvent(SchemaAndValue schemaAndValue, SerializerType serializerType) {
return baseCheck(schemaAndValue, serializerType) && checkFields(schemaAndValue.value(), serializerType);
private SerializerType serializerType;
public void configure(SerializerType serializerType) {
this.serializerType = serializerType;
}
public void verifyIsCloudEvent(SchemaAndValue schemaAndValue, SerializerType serializerType) {
if (!isCloudEvent(schemaAndValue, serializerType)) {
public boolean isCloudEvent(SchemaAndValue schemaAndValue) {
return baseCheck(schemaAndValue) && checkFields(schemaAndValue.value());
}
public void verifyIsCloudEvent(SchemaAndValue schemaAndValue) {
if (!isCloudEvent(schemaAndValue)) {
throw new DataException("A deserialized record's value is not a CloudEvent: value=" + schemaAndValue.value());
}
}
private boolean baseCheck(SchemaAndValue schemaAndValue, SerializerType serializerType) {
private boolean baseCheck(SchemaAndValue schemaAndValue) {
switch (serializerType) {
case JSON:
return schemaAndValue.schema() == null && schemaAndValue.value() instanceof Map;
@ -50,7 +56,7 @@ private boolean baseCheck(SchemaAndValue schemaAndValue, SerializerType serializ
}
}
private boolean checkFields(Object value, SerializerType serializerType) {
private boolean checkFields(Object value) {
final List<String> fieldNames;
switch (serializerType) {
case JSON: