DBZ-1711 Introduce more build-time column configuration options

The extension's columns for the outbox table can now be fully customized by using
build-time configuration properties to influence a column's database definition,
the columns length/precision/scale/nullability as well as specifying a JPA
AttributeConverter.  While not all theses options apply to every column, such as
the identifier, the goal is to allow flexibility.
This commit is contained in:
Chris Cranford 2020-01-23 16:24:01 -05:00 committed by Chris Cranford
parent 1da43853c5
commit 8c10e0ae66
3 changed files with 585 additions and 89 deletions

View File

@ -5,6 +5,9 @@
*/
package io.debezium.outbox.quarkus.deployment;
import java.util.Optional;
import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
@ -25,38 +28,283 @@ public class DebeziumOutboxConfig {
public String tableName;
/**
* The column name that contains the event id in the outbox table
* Outbox identifier configurable attributes
*/
@ConfigItem(defaultValue = "id")
public String idColumnName;
@ConfigItem
public DebeziumOutboxConfigId id;
/**
* The column name that contains the event key within the outbox table
* Outbox aggregate-id configurable attributes
*/
@ConfigItem(defaultValue = "aggregateid")
public String aggregateIdColumnName;
@ConfigItem
public DebeziumOutboxConfigAggregateId aggregateId;
/**
* The column name that contains the event type in the outbox table
* Outbox aggregate-type configurable attributes
*/
@ConfigItem(defaultValue = "type")
public String typeColumnName;
@ConfigItem
public DebeziumOutboxConfigAggregateType aggregateType;
/**
* The column name that contains the timestamp in the outbox table
* Outbox type configurable attributes
*/
@ConfigItem(defaultValue = "timestamp")
public String timestampColumnName;
@ConfigItem
public DebeziumOutboxConfigType type;
/**
* The column name that contains the event payload in the outbox table
* Outbox timestamp configurable attributes
*/
@ConfigItem(defaultValue = "payload")
public String payloadColumnName;
@ConfigItem
public DebeziumOutboxConfigTimestamp timestamp;
/**
* The column name that determines how the events will be routed in the outbox table
* Outbox payload configurable attributes
*/
@ConfigItem(defaultValue = "aggregatetype")
public String aggregateTypeColumnName;
@ConfigItem
public DebeziumOutboxConfigPayload payload;
@ConfigGroup
public static class DebeziumOutboxConfigId {
/**
* The column name.
*/
@ConfigItem(defaultValue = "id")
public String name;
/**
* The column definition.
*/
@ConfigItem
public Optional<String> columnDefinition;
}
@ConfigGroup
public static class DebeziumOutboxConfigAggregateType {
/**
* The column name.
*/
@ConfigItem(defaultValue = "aggregatetype")
public String name;
/**
* The column definition.
*/
@ConfigItem
public Optional<String> columnDefinition;
/**
* The column length.
*/
@ConfigItem
public Optional<Integer> length;
/**
* The column scale.
*/
@ConfigItem
public Optional<Integer> scale;
/**
* The column precision.
*/
@ConfigItem
public Optional<Integer> precision;
/**
* The column's attribute converter fully qualified class name.
* @see javax.persistence.AttributeConverter
*/
@ConfigItem
public Optional<String> converter;
/**
* Whether the column is nullable.
*/
@ConfigItem(defaultValue = "false")
public boolean nullable;
}
@ConfigGroup
public static class DebeziumOutboxConfigAggregateId {
/**
* The column name.
*/
@ConfigItem(defaultValue = "aggregateid")
public String name;
/**
* The column definition.
*/
@ConfigItem
public Optional<String> columnDefinition;
/**
* The column length.
*/
@ConfigItem
public Optional<Integer> length;
/**
* The column scale.
*/
@ConfigItem
public Optional<Integer> scale;
/**
* The column precision.
*/
@ConfigItem
public Optional<Integer> precision;
/**
* The column's attribute converter fully qualified class name.
* @see javax.persistence.AttributeConverter
*/
@ConfigItem
public Optional<String> converter;
/**
* Whether the column is nullable.
*/
@ConfigItem(defaultValue = "false")
public boolean nullable;
}
@ConfigGroup
public static class DebeziumOutboxConfigType {
/**
* The column name.
*/
@ConfigItem(defaultValue = "type")
public String name;
/**
* The column definition.
*/
@ConfigItem
public Optional<String> columnDefinition;
/**
* The column length.
*/
@ConfigItem
public Optional<Integer> length;
/**
* The column scale.
*/
@ConfigItem
public Optional<Integer> scale;
/**
* The column precision.
*/
@ConfigItem
public Optional<Integer> precision;
/**
* The column's attribute converter fully qualified class name.
* @see javax.persistence.AttributeConverter
*/
@ConfigItem
public Optional<String> converter;
/**
* Whether the column is nullable.
*/
@ConfigItem(defaultValue = "false")
public boolean nullable;
}
@ConfigGroup
public static class DebeziumOutboxConfigTimestamp {
/**
* The column name.
*/
@ConfigItem(defaultValue = "timestamp")
public String name;
/**
* The column definition.
*/
@ConfigItem
public Optional<String> columnDefinition;
/**
* The column length.
*/
@ConfigItem
public Optional<Integer> length;
/**
* The column scale.
*/
@ConfigItem
public Optional<Integer> scale;
/**
* The column precision.
*/
@ConfigItem
public Optional<Integer> precision;
/**
* The column's attribute converter fully qualified class name.
* @see javax.persistence.AttributeConverter
*/
@ConfigItem
public Optional<String> converter;
/**
* Whether the column is nullable.
*/
@ConfigItem(defaultValue = "false")
public boolean nullable;
}
@ConfigGroup
public static class DebeziumOutboxConfigPayload {
/**
* The column name.
*/
@ConfigItem(defaultValue = "payload")
public String name;
/**
* The column definition.
*/
@ConfigItem
public Optional<String> columnDefinition;
/**
* The column length.
*/
@ConfigItem
public Optional<Integer> length;
/**
* The column scale.
*/
@ConfigItem
public Optional<Integer> scale;
/**
* The column precision.
*/
@ConfigItem
public Optional<Integer> precision;
/**
* The column's attribute converter fully qualified class name.
* @see javax.persistence.AttributeConverter
*/
@ConfigItem
public Optional<String> converter;
/**
* Whether the column is nullable.
*/
@ConfigItem(defaultValue = "true")
public boolean nullable;
}
}

View File

@ -26,6 +26,8 @@
*/
public class OutboxEventHbmWriter {
private static final String JACKSON_JSONNODE = "com.fasterxml.jackson.databind.JsonNode";
static JaxbHbmHibernateMapping write(DebeziumOutboxConfig config, OutboxEventEntityBuildItem outboxEventEntityBuildItem) {
final JaxbHbmHibernateMapping mapping = new JaxbHbmHibernateMapping();
@ -40,70 +42,161 @@ static JaxbHbmHibernateMapping write(DebeziumOutboxConfig config, OutboxEventEnt
generatorType.setClazz("uuid2");
mapping.getIdentifierGenerator().add(generatorType);
// Setup the ID
final JaxbHbmSimpleIdType idType = new JaxbHbmSimpleIdType();
idType.setName("id");
idType.setColumnAttribute(config.idColumnName);
idType.setTypeAttribute(UUID.class.getName());
final JaxbHbmGeneratorSpecificationType generatorSpecType = new JaxbHbmGeneratorSpecificationType();
generatorSpecType.setClazz("uuid2");
idType.setGenerator(generatorSpecType);
entityType.setId(idType);
// Setup the aggregateType
final JaxbHbmBasicAttributeType aggregateType = new JaxbHbmBasicAttributeType();
aggregateType.setName("aggregateType");
aggregateType.setColumnAttribute(config.aggregateTypeColumnName);
aggregateType.setTypeAttribute("string");
aggregateType.setNotNull(true);
entityType.getAttributes().add(aggregateType);
// Setup the aggregateIdType
final JaxbHbmBasicAttributeType aggregateIdType = new JaxbHbmBasicAttributeType();
aggregateIdType.setName("aggregateId");
aggregateIdType.setColumnAttribute(config.aggregateIdColumnName);
aggregateIdType.setTypeAttribute(outboxEventEntityBuildItem.getAggregateIdType().name().toString());
aggregateIdType.setNotNull(true);
entityType.getAttributes().add(aggregateIdType);
// Setup the typeType
final JaxbHbmBasicAttributeType typeType = new JaxbHbmBasicAttributeType();
typeType.setName("type");
typeType.setColumnAttribute(config.typeColumnName);
typeType.setTypeAttribute("string");
typeType.setNotNull(true);
entityType.getAttributes().add(typeType);
// Setup the timestampType
final JaxbHbmBasicAttributeType timestampType = new JaxbHbmBasicAttributeType();
timestampType.setName("timestamp");
timestampType.setColumnAttribute(config.timestampColumnName);
timestampType.setTypeAttribute("Instant");
timestampType.setNotNull(true);
entityType.getAttributes().add(timestampType);
// Setup the payloadType
final JaxbHbmBasicAttributeType payloadType = new JaxbHbmBasicAttributeType();
payloadType.setName("payload");
// todo: this needs some more testing with varied data types
final String payloadClassType = outboxEventEntityBuildItem.getPayloadType().name().toString();
if (payloadClassType.equals("com.fasterxml.jackson.databind.JsonNode")) {
payloadType.setTypeAttribute("converted::" + JsonNodeAttributeConverter.class.getName());
final JaxbHbmColumnType columnType = new JaxbHbmColumnType();
columnType.setName(config.payloadColumnName);
columnType.setSqlType("varchar(8000)");
payloadType.getColumnOrFormula().add(columnType);
}
else {
payloadType.setColumnAttribute(config.payloadColumnName);
payloadType.setTypeAttribute(outboxEventEntityBuildItem.getPayloadType().name().toString());
}
entityType.getAttributes().add(payloadType);
// Setup attributes
entityType.setId(createIdAttribute(config));
entityType.getAttributes().add(createAggregateTypeAttribute(config));
entityType.getAttributes().add(createAggregateIdAttribute(config, outboxEventEntityBuildItem));
entityType.getAttributes().add(createTypeAttribute(config));
entityType.getAttributes().add(createTimestampAttribute(config));
entityType.getAttributes().add(createPayloadAttribute(config, outboxEventEntityBuildItem));
return mapping;
}
private static JaxbHbmSimpleIdType createIdAttribute(DebeziumOutboxConfig config) {
final JaxbHbmSimpleIdType attribute = new JaxbHbmSimpleIdType();
attribute.setName("id");
attribute.setTypeAttribute(UUID.class.getName());
final JaxbHbmColumnType column = new JaxbHbmColumnType();
column.setName(config.id.name);
config.id.columnDefinition.ifPresent(column::setSqlType);
attribute.getColumn().add(column);
final JaxbHbmGeneratorSpecificationType generator = new JaxbHbmGeneratorSpecificationType();
generator.setClazz("uuid2");
attribute.setGenerator(generator);
return attribute;
}
private static JaxbHbmBasicAttributeType createAggregateTypeAttribute(DebeziumOutboxConfig config) {
final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType();
attribute.setName("aggregateType");
attribute.setNotNull(!config.aggregateType.nullable);
if (config.aggregateType.converter.isPresent()) {
attribute.setTypeAttribute("converted::" + config.aggregateType.converter.get());
}
else {
attribute.setTypeAttribute("string");
}
final JaxbHbmColumnType column = new JaxbHbmColumnType();
column.setName(config.aggregateType.name);
config.aggregateType.columnDefinition.ifPresent(column::setSqlType);
config.aggregateType.length.ifPresent(column::setLength);
config.aggregateType.scale.ifPresent(column::setScale);
config.aggregateType.precision.ifPresent(column::setPrecision);
attribute.getColumnOrFormula().add(column);
return attribute;
}
private static JaxbHbmBasicAttributeType createAggregateIdAttribute(DebeziumOutboxConfig config,
OutboxEventEntityBuildItem outboxEventEntityBuildItem) {
final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType();
attribute.setName("aggregateId");
attribute.setNotNull(!config.aggregateId.nullable);
if (config.aggregateId.converter.isPresent()) {
attribute.setTypeAttribute("converted::" + config.aggregateId.converter.get());
}
else {
attribute.setTypeAttribute(outboxEventEntityBuildItem.getAggregateIdType().name().toString());
}
final JaxbHbmColumnType column = new JaxbHbmColumnType();
column.setName(config.aggregateId.name);
config.aggregateId.columnDefinition.ifPresent(column::setSqlType);
config.aggregateId.length.ifPresent(column::setLength);
config.aggregateId.scale.ifPresent(column::setScale);
config.aggregateId.precision.ifPresent(column::setPrecision);
attribute.getColumnOrFormula().add(column);
return attribute;
}
private static JaxbHbmBasicAttributeType createTypeAttribute(DebeziumOutboxConfig config) {
final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType();
attribute.setName("type");
attribute.setNotNull(!config.type.nullable);
if (config.type.converter.isPresent()) {
attribute.setTypeAttribute("converted::" + config.type.converter.get());
}
else {
attribute.setTypeAttribute("string");
}
final JaxbHbmColumnType column = new JaxbHbmColumnType();
column.setName(config.type.name);
config.type.columnDefinition.ifPresent(column::setSqlType);
config.type.length.ifPresent(column::setLength);
config.type.precision.ifPresent(column::setPrecision);
config.type.scale.ifPresent(column::setScale);
attribute.getColumnOrFormula().add(column);
return attribute;
}
private static JaxbHbmBasicAttributeType createTimestampAttribute(DebeziumOutboxConfig config) {
final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType();
attribute.setName("timestamp");
attribute.setNotNull(!config.timestamp.nullable);
if (config.timestamp.converter.isPresent()) {
attribute.setTypeAttribute("converted::" + config.timestamp.converter.get());
}
else {
attribute.setTypeAttribute("Instant");
}
final JaxbHbmColumnType column = new JaxbHbmColumnType();
column.setName(config.timestamp.name);
config.timestamp.columnDefinition.ifPresent(column::setSqlType);
config.timestamp.length.ifPresent(column::setLength);
config.timestamp.precision.ifPresent(column::setPrecision);
config.timestamp.scale.ifPresent(column::setScale);
attribute.getColumnOrFormula().add(column);
return attribute;
}
private static JaxbHbmBasicAttributeType createPayloadAttribute(DebeziumOutboxConfig config,
OutboxEventEntityBuildItem outboxEventEntityBuildItem) {
final boolean isJacksonJsonNode = isPayloadJacksonJsonNode(outboxEventEntityBuildItem);
final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType();
attribute.setName("payload");
attribute.setNotNull(!config.payload.nullable);
if (config.payload.converter.isPresent()) {
attribute.setTypeAttribute("converted::" + config.payload.converter.get());
}
else if (isJacksonJsonNode) {
attribute.setTypeAttribute("converted::" + JsonNodeAttributeConverter.class.getName());
}
else {
attribute.setTypeAttribute(outboxEventEntityBuildItem.getPayloadType().name().toString());
}
final JaxbHbmColumnType column = new JaxbHbmColumnType();
column.setName(config.payload.name);
if (config.payload.columnDefinition.isPresent()) {
column.setSqlType(config.payload.columnDefinition.get());
}
else if (isJacksonJsonNode) {
column.setSqlType("varchar(8000)");
}
config.payload.length.ifPresent(column::setLength);
config.payload.precision.ifPresent(column::setPrecision);
config.payload.scale.ifPresent(column::setScale);
attribute.getColumnOrFormula().add(column);
return attribute;
}
private static boolean isPayloadJacksonJsonNode(OutboxEventEntityBuildItem outboxEventEntityBuildItem) {
return outboxEventEntityBuildItem.getPayloadType().name().toString().equals(JACKSON_JSONNODE);
}
}

View File

@ -137,35 +137,190 @@ The table name to be used when creating the outbox table.
|OutboxEvent
|`quarkus.debezium-outbox.id-column-name`::
|`quarkus.debezium-outbox.id.name`::
The column name for the event id column.
|string
|`id`
|`quarkus.debezium-outbox.aggregate-id-column.name`::
|`quarkus.debezium-outbox.id.column-definition`::
The database-specific column definition for the event id column.
|string
|
|`quarkus.debezium-outbox.aggregate-id.name`::
The column name for the event key column.
|string
|`aggregateid`
|`quarkus.debezium-outbox.type-column-name`::
|`quarkus.debezium-outbox.aggregate-id.column-definition`::
The database-specific column definition for the aggregate id.
|string
|
|`quarkus.debezium-outbox.aggregate-id.length`::
The length of the event key column.
|integer
|
|`quarkus.debezium-outbox.aggregate-id.scale`::
The scale of the event key column.
|integer
|
|`quarkus.debezium-outbox.aggregate-id.precision`::
The precision of the event key column.
|integer
|
|`quarkus.debezium-outbox.aggregate-id.converter`::
The JPA AttributeConverter for the event key column.
|string
|
|`quarkus.debezium-outbox.aggregate-id.nullable`::
Whether the event key column is nullable.
|boolean
|false
|`quarkus.debezium-outbox.aggregate-type-name`::
The column name for the event aggregate type column.
|string
|`aggregatetype`
|`quarkus.debezium-outbox.aggregate-type.column-definition`::
The database-specific column definition for the aggregate type.
|string
|
|`quarkus.debezium-outbox.aggregate-type.length`::
The length of the event aggregate type column.
|integer
|
|`quarkus.debezium-outbox.aggregate-type.scale`::
The scale of the event aggregate type column.
|integer
|
|`quarkus.debezium-outbox.aggregate-type.precision`::
The precision of the event aggregate type column.
|integer
|
|`quarkus.debezium-outbox.aggregate-type.converter`::
The JPA AttributeConverter for the event aggregate type column.
|string
|
|`quarkus.debezium-outbox.aggregate-type.nullable`::
Whether the event aggregate type column is nullable.
|boolean
|false
|`quarkus.debezium-outbox.type.name`::
The column name for the event type column.
|string
|`type`
|`quarkus.debezium-outbox.timestamp-column-name`::
|`quarkus.debezium-outbox.type.column-definition`::
The database-specific column definition for the event type.
|string
|
|`quarkus.debezium-outbox.type.length`::
The length of the event type column.
|integer
|
|`quarkus.debezium-outbox.type.scale`::
The scale of the event type column.
|integer
|
|`quarkus.debezium-outbox.type.precision`::
The precision of the event type column.
|integer
|
|`quarkus.debezium-outbox.type.converter`::
The JPA AttributeConverter for the event type column.
|string
|
|`quarkus.debezium-outbox.type.nullable`::
Whether the event type column is nullable.
|boolean
|false
|`quarkus.debezium-outbox.timestamp.name`::
The column name for the event timestamp column.
|string
|`timestamp`
|`quarkus.debezium-outbox.payload-column-name`::
|`quarkus.debezium-outbox.timestamp.column-definition`::
The database-specific column definition for the event timestamp.
|string
|
|`quarkus.debezium-outbox.timestamp.length`::
The length of the event timestamp column.
|integer
|
|`quarkus.debezium-outbox.timestamp.scale`::
The scale of the event timestamp column.
|integer
|
|`quarkus.debezium-outbox.timestamp.precision`::
The precision of the event timestamp column.
|integer
|
|`quarkus.debezium-outbox.timestamp.converter`::
The JPA AttributeConverter for the event timestamp column.
|string
|
|`quarkus.debezium-outbox.timestamp.nullable`::
Whether the event timestamp column is nullable.
|boolean
|false
|`quarkus.debezium-outbox.payload.name`::
The column name for the event payload column.
|string
|`payload`
|`quarkus.debezium-outbox.aggregate-type-column-name`::
The column name for the event aggregate type column.
|`quarkus.debezium-outbox.payload.column-definition`::
The database-specific column definition for the event payload.
|string
|`aggregatetype`
|
|`quarkus.debezium-outbox.payload.length`::
The length of the event payload column.
|integer
|
|`quarkus.debezium-outbox.payload.scale`::
The scale of the event payload column.
|integer
|
|`quarkus.debezium-outbox.payload.precision`::
The precision of the event payload column.
|integer
|
|`quarkus.debezium-outbox.payload.converter`::
The JPA AttributeConverter for the event payload column.
|string
|
|`quarkus.debezium-outbox.payload.nullable`::
Whether the event payload column is nullable.
|boolean
|true
|=======================