DBZ-5229 Restore Kafka-based database history options in SchemaGenerator

This commit is contained in:
Chris Cranford 2022-08-26 13:58:38 -04:00 committed by Chris Cranford
parent 3b11c656fe
commit 92ca57d731
3 changed files with 63 additions and 36 deletions

View File

@ -229,6 +229,20 @@
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>io.debezium</groupId>
<artifactId>debezium-schema-generator</artifactId>
<version>${project.version}</version>
<executions>
<execution>
<id>generate-connector-metadata</id>
<goals>
<goal>generate-api-spec</goal>
</goals>
<phase>prepare-package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@ -18,6 +18,10 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>

View File

@ -19,7 +19,9 @@
import io.debezium.config.Field;
import io.debezium.metadata.ConnectorMetadata;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.schemagenerator.schema.Schema.FieldFilter;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.smallrye.openapi.api.models.media.SchemaImpl;
public class JsonSchemaCreatorService {
@ -121,42 +123,7 @@ public Schema buildConnectorSchema() {
orderedPropertiesByCategory.put(category, new TreeMap<>());
});
connectorMetadata.getConnectorFields().forEach(field -> {
String propertyName = field.name();
Field checkedField = checkField(field);
if (null != checkedField) {
SchemaImpl propertySchema = new SchemaImpl(propertyName);
Set<?> allowedValues = checkedField.allowedValues();
if (null != allowedValues && !allowedValues.isEmpty()) {
propertySchema.enumeration(new ArrayList<>(allowedValues));
}
if (checkedField.isRequired()) {
propertySchema.nullable(false);
schema.addRequired(propertyName);
}
propertySchema.description(checkedField.description());
propertySchema.defaultValue(checkedField.defaultValue());
JsonSchemaType jsonSchemaType = toJsonSchemaType(checkedField.type());
propertySchema.type(jsonSchemaType.schemaType);
if (null != jsonSchemaType.format) {
propertySchema.format(jsonSchemaType.format);
}
propertySchema.title(checkedField.displayName());
Map<String, Object> extensions = new HashMap<>();
extensions.put("name", checkedField.name()); // @TODO remove "x-name" in favor of map key?
Field.GroupEntry groupEntry = checkedField.group();
extensions.put("category", groupEntry.getGroup().name());
propertySchema.extensions(extensions);
SortedMap<Integer, SchemaImpl> groupProperties = orderedPropertiesByCategory.get(groupEntry.getGroup());
if (groupProperties.containsKey(groupEntry.getPositionInGroup())) {
errors.add("[ERROR] Position in group \"" + groupEntry.getGroup().name() + "\" for property \""
+ propertyName + "\" is used more than once for connector \"" + connectorName + "\".");
}
else {
groupProperties.put(groupEntry.getPositionInGroup(), propertySchema);
}
}
});
connectorMetadata.getConnectorFields().forEach(field -> processField(schema, orderedPropertiesByCategory, field));
Arrays.stream(Field.Group.values()).forEach(
group -> orderedPropertiesByCategory.get(group).forEach((position, propertySchema) -> schema.addProperty(propertySchema.getName(), propertySchema)));
@ -170,4 +137,46 @@ public Schema buildConnectorSchema() {
return schema;
}
private void processField(Schema schema, Map<Field.Group, SortedMap<Integer, SchemaImpl>> orderedPropertiesByCategory, Field field) {
String propertyName = field.name();
Field checkedField = checkField(field);
if (null != checkedField) {
SchemaImpl propertySchema = new SchemaImpl(propertyName);
Set<?> allowedValues = checkedField.allowedValues();
if (null != allowedValues && !allowedValues.isEmpty()) {
propertySchema.enumeration(new ArrayList<>(allowedValues));
}
if (checkedField.isRequired()) {
propertySchema.nullable(false);
schema.addRequired(propertyName);
}
propertySchema.description(checkedField.description());
propertySchema.defaultValue(checkedField.defaultValue());
JsonSchemaType jsonSchemaType = toJsonSchemaType(checkedField.type());
propertySchema.type(jsonSchemaType.schemaType);
if (null != jsonSchemaType.format) {
propertySchema.format(jsonSchemaType.format);
}
propertySchema.title(checkedField.displayName());
Map<String, Object> extensions = new HashMap<>();
extensions.put("name", checkedField.name()); // @TODO remove "x-name" in favor of map key?
Field.GroupEntry groupEntry = checkedField.group();
extensions.put("category", groupEntry.getGroup().name());
propertySchema.extensions(extensions);
SortedMap<Integer, SchemaImpl> groupProperties = orderedPropertiesByCategory.get(groupEntry.getGroup());
if (groupProperties.containsKey(groupEntry.getPositionInGroup())) {
errors.add("[ERROR] Position in group \"" + groupEntry.getGroup().name() + "\" for property \""
+ propertyName + "\" is used more than once for connector \"" + connectorName + "\".");
}
else {
groupProperties.put(groupEntry.getPositionInGroup(), propertySchema);
}
if (propertyName.equals(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name())) {
// todo: how to eventually support varied storage modules
KafkaDatabaseHistory.ALL_FIELDS.forEach(historyField -> processField(schema, orderedPropertiesByCategory, historyField));
}
}
}
}