diff --git a/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/DebeziumOutboxConfig.java b/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/DebeziumOutboxConfig.java index 67df66a51..78cb74e7b 100644 --- a/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/DebeziumOutboxConfig.java +++ b/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/DebeziumOutboxConfig.java @@ -5,6 +5,9 @@ */ package io.debezium.outbox.quarkus.deployment; +import java.util.Optional; + +import io.quarkus.runtime.annotations.ConfigGroup; import io.quarkus.runtime.annotations.ConfigItem; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; @@ -25,38 +28,283 @@ public class DebeziumOutboxConfig { public String tableName; /** - * The column name that contains the event id in the outbox table + * Outbox identifier configurable attributes */ - @ConfigItem(defaultValue = "id") - public String idColumnName; + @ConfigItem + public DebeziumOutboxConfigId id; /** - * The column name that contains the event key within the outbox table + * Outbox aggregate-id configurable attributes */ - @ConfigItem(defaultValue = "aggregateid") - public String aggregateIdColumnName; + @ConfigItem + public DebeziumOutboxConfigAggregateId aggregateId; /** - * The column name that contains the event type in the outbox table + * Outbox aggregate-type configurable attributes */ - @ConfigItem(defaultValue = "type") - public String typeColumnName; + @ConfigItem + public DebeziumOutboxConfigAggregateType aggregateType; /** - * The column name that contains the timestamp in the outbox table + * Outbox type configurable attributes */ - @ConfigItem(defaultValue = "timestamp") - public String timestampColumnName; + @ConfigItem + public DebeziumOutboxConfigType type; /** - * The column name that contains the event payload in the outbox table + * Outbox timestamp configurable attributes */ - @ConfigItem(defaultValue = "payload") - public String payloadColumnName; + @ConfigItem + public DebeziumOutboxConfigTimestamp timestamp; /** - * The column name that determines how the events will be routed in the outbox table + * Outbox payload configurable attributes */ - @ConfigItem(defaultValue = "aggregatetype") - public String aggregateTypeColumnName; + @ConfigItem + public DebeziumOutboxConfigPayload payload; + + @ConfigGroup + public static class DebeziumOutboxConfigId { + /** + * The column name. + */ + @ConfigItem(defaultValue = "id") + public String name; + + /** + * The column definition. + */ + @ConfigItem + public Optional columnDefinition; + } + + @ConfigGroup + public static class DebeziumOutboxConfigAggregateType { + /** + * The column name. + */ + @ConfigItem(defaultValue = "aggregatetype") + public String name; + + /** + * The column definition. + */ + @ConfigItem + public Optional columnDefinition; + + /** + * The column length. + */ + @ConfigItem + public Optional length; + + /** + * The column scale. + */ + @ConfigItem + public Optional scale; + + /** + * The column precision. + */ + @ConfigItem + public Optional precision; + + /** + * The column's attribute converter fully qualified class name. + * @see javax.persistence.AttributeConverter + */ + @ConfigItem + public Optional converter; + + /** + * Whether the column is nullable. + */ + @ConfigItem(defaultValue = "false") + public boolean nullable; + } + + @ConfigGroup + public static class DebeziumOutboxConfigAggregateId { + /** + * The column name. + */ + @ConfigItem(defaultValue = "aggregateid") + public String name; + + /** + * The column definition. + */ + @ConfigItem + public Optional columnDefinition; + + /** + * The column length. + */ + @ConfigItem + public Optional length; + + /** + * The column scale. + */ + @ConfigItem + public Optional scale; + + /** + * The column precision. + */ + @ConfigItem + public Optional precision; + + /** + * The column's attribute converter fully qualified class name. + * @see javax.persistence.AttributeConverter + */ + @ConfigItem + public Optional converter; + + /** + * Whether the column is nullable. + */ + @ConfigItem(defaultValue = "false") + public boolean nullable; + } + + @ConfigGroup + public static class DebeziumOutboxConfigType { + /** + * The column name. + */ + @ConfigItem(defaultValue = "type") + public String name; + + /** + * The column definition. + */ + @ConfigItem + public Optional columnDefinition; + + /** + * The column length. + */ + @ConfigItem + public Optional length; + + /** + * The column scale. + */ + @ConfigItem + public Optional scale; + + /** + * The column precision. + */ + @ConfigItem + public Optional precision; + + /** + * The column's attribute converter fully qualified class name. + * @see javax.persistence.AttributeConverter + */ + @ConfigItem + public Optional converter; + + /** + * Whether the column is nullable. + */ + @ConfigItem(defaultValue = "false") + public boolean nullable; + } + + @ConfigGroup + public static class DebeziumOutboxConfigTimestamp { + /** + * The column name. + */ + @ConfigItem(defaultValue = "timestamp") + public String name; + + /** + * The column definition. + */ + @ConfigItem + public Optional columnDefinition; + + /** + * The column length. + */ + @ConfigItem + public Optional length; + + /** + * The column scale. + */ + @ConfigItem + public Optional scale; + + /** + * The column precision. + */ + @ConfigItem + public Optional precision; + + /** + * The column's attribute converter fully qualified class name. + * @see javax.persistence.AttributeConverter + */ + @ConfigItem + public Optional converter; + + /** + * Whether the column is nullable. + */ + @ConfigItem(defaultValue = "false") + public boolean nullable; + } + + @ConfigGroup + public static class DebeziumOutboxConfigPayload { + /** + * The column name. + */ + @ConfigItem(defaultValue = "payload") + public String name; + + /** + * The column definition. + */ + @ConfigItem + public Optional columnDefinition; + + /** + * The column length. + */ + @ConfigItem + public Optional length; + + /** + * The column scale. + */ + @ConfigItem + public Optional scale; + + /** + * The column precision. + */ + @ConfigItem + public Optional precision; + + /** + * The column's attribute converter fully qualified class name. + * @see javax.persistence.AttributeConverter + */ + @ConfigItem + public Optional converter; + + /** + * Whether the column is nullable. + */ + @ConfigItem(defaultValue = "true") + public boolean nullable; + } } diff --git a/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/OutboxEventHbmWriter.java b/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/OutboxEventHbmWriter.java index aa1b88e4f..6db9efc62 100644 --- a/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/OutboxEventHbmWriter.java +++ b/debezium-quarkus-outbox/deployment/src/main/java/io/debezium/outbox/quarkus/deployment/OutboxEventHbmWriter.java @@ -26,6 +26,8 @@ */ public class OutboxEventHbmWriter { + private static final String JACKSON_JSONNODE = "com.fasterxml.jackson.databind.JsonNode"; + static JaxbHbmHibernateMapping write(DebeziumOutboxConfig config, OutboxEventEntityBuildItem outboxEventEntityBuildItem) { final JaxbHbmHibernateMapping mapping = new JaxbHbmHibernateMapping(); @@ -40,70 +42,161 @@ static JaxbHbmHibernateMapping write(DebeziumOutboxConfig config, OutboxEventEnt generatorType.setClazz("uuid2"); mapping.getIdentifierGenerator().add(generatorType); - // Setup the ID - final JaxbHbmSimpleIdType idType = new JaxbHbmSimpleIdType(); - idType.setName("id"); - idType.setColumnAttribute(config.idColumnName); - idType.setTypeAttribute(UUID.class.getName()); - - final JaxbHbmGeneratorSpecificationType generatorSpecType = new JaxbHbmGeneratorSpecificationType(); - generatorSpecType.setClazz("uuid2"); - idType.setGenerator(generatorSpecType); - - entityType.setId(idType); - - // Setup the aggregateType - final JaxbHbmBasicAttributeType aggregateType = new JaxbHbmBasicAttributeType(); - aggregateType.setName("aggregateType"); - aggregateType.setColumnAttribute(config.aggregateTypeColumnName); - aggregateType.setTypeAttribute("string"); - aggregateType.setNotNull(true); - entityType.getAttributes().add(aggregateType); - - // Setup the aggregateIdType - final JaxbHbmBasicAttributeType aggregateIdType = new JaxbHbmBasicAttributeType(); - aggregateIdType.setName("aggregateId"); - aggregateIdType.setColumnAttribute(config.aggregateIdColumnName); - aggregateIdType.setTypeAttribute(outboxEventEntityBuildItem.getAggregateIdType().name().toString()); - aggregateIdType.setNotNull(true); - entityType.getAttributes().add(aggregateIdType); - - // Setup the typeType - final JaxbHbmBasicAttributeType typeType = new JaxbHbmBasicAttributeType(); - typeType.setName("type"); - typeType.setColumnAttribute(config.typeColumnName); - typeType.setTypeAttribute("string"); - typeType.setNotNull(true); - entityType.getAttributes().add(typeType); - - // Setup the timestampType - final JaxbHbmBasicAttributeType timestampType = new JaxbHbmBasicAttributeType(); - timestampType.setName("timestamp"); - timestampType.setColumnAttribute(config.timestampColumnName); - timestampType.setTypeAttribute("Instant"); - timestampType.setNotNull(true); - entityType.getAttributes().add(timestampType); - - // Setup the payloadType - final JaxbHbmBasicAttributeType payloadType = new JaxbHbmBasicAttributeType(); - payloadType.setName("payload"); - - // todo: this needs some more testing with varied data types - final String payloadClassType = outboxEventEntityBuildItem.getPayloadType().name().toString(); - if (payloadClassType.equals("com.fasterxml.jackson.databind.JsonNode")) { - payloadType.setTypeAttribute("converted::" + JsonNodeAttributeConverter.class.getName()); - - final JaxbHbmColumnType columnType = new JaxbHbmColumnType(); - columnType.setName(config.payloadColumnName); - columnType.setSqlType("varchar(8000)"); - payloadType.getColumnOrFormula().add(columnType); - } - else { - payloadType.setColumnAttribute(config.payloadColumnName); - payloadType.setTypeAttribute(outboxEventEntityBuildItem.getPayloadType().name().toString()); - } - entityType.getAttributes().add(payloadType); + // Setup attributes + entityType.setId(createIdAttribute(config)); + entityType.getAttributes().add(createAggregateTypeAttribute(config)); + entityType.getAttributes().add(createAggregateIdAttribute(config, outboxEventEntityBuildItem)); + entityType.getAttributes().add(createTypeAttribute(config)); + entityType.getAttributes().add(createTimestampAttribute(config)); + entityType.getAttributes().add(createPayloadAttribute(config, outboxEventEntityBuildItem)); return mapping; } + + private static JaxbHbmSimpleIdType createIdAttribute(DebeziumOutboxConfig config) { + final JaxbHbmSimpleIdType attribute = new JaxbHbmSimpleIdType(); + attribute.setName("id"); + attribute.setTypeAttribute(UUID.class.getName()); + + final JaxbHbmColumnType column = new JaxbHbmColumnType(); + column.setName(config.id.name); + config.id.columnDefinition.ifPresent(column::setSqlType); + attribute.getColumn().add(column); + + final JaxbHbmGeneratorSpecificationType generator = new JaxbHbmGeneratorSpecificationType(); + generator.setClazz("uuid2"); + attribute.setGenerator(generator); + + return attribute; + } + + private static JaxbHbmBasicAttributeType createAggregateTypeAttribute(DebeziumOutboxConfig config) { + final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType(); + attribute.setName("aggregateType"); + attribute.setNotNull(!config.aggregateType.nullable); + if (config.aggregateType.converter.isPresent()) { + attribute.setTypeAttribute("converted::" + config.aggregateType.converter.get()); + } + else { + attribute.setTypeAttribute("string"); + } + + final JaxbHbmColumnType column = new JaxbHbmColumnType(); + column.setName(config.aggregateType.name); + config.aggregateType.columnDefinition.ifPresent(column::setSqlType); + config.aggregateType.length.ifPresent(column::setLength); + config.aggregateType.scale.ifPresent(column::setScale); + config.aggregateType.precision.ifPresent(column::setPrecision); + attribute.getColumnOrFormula().add(column); + + return attribute; + } + + private static JaxbHbmBasicAttributeType createAggregateIdAttribute(DebeziumOutboxConfig config, + OutboxEventEntityBuildItem outboxEventEntityBuildItem) { + final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType(); + attribute.setName("aggregateId"); + attribute.setNotNull(!config.aggregateId.nullable); + if (config.aggregateId.converter.isPresent()) { + attribute.setTypeAttribute("converted::" + config.aggregateId.converter.get()); + } + else { + attribute.setTypeAttribute(outboxEventEntityBuildItem.getAggregateIdType().name().toString()); + } + + final JaxbHbmColumnType column = new JaxbHbmColumnType(); + column.setName(config.aggregateId.name); + config.aggregateId.columnDefinition.ifPresent(column::setSqlType); + config.aggregateId.length.ifPresent(column::setLength); + config.aggregateId.scale.ifPresent(column::setScale); + config.aggregateId.precision.ifPresent(column::setPrecision); + attribute.getColumnOrFormula().add(column); + + return attribute; + } + + private static JaxbHbmBasicAttributeType createTypeAttribute(DebeziumOutboxConfig config) { + final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType(); + attribute.setName("type"); + attribute.setNotNull(!config.type.nullable); + if (config.type.converter.isPresent()) { + attribute.setTypeAttribute("converted::" + config.type.converter.get()); + } + else { + attribute.setTypeAttribute("string"); + } + + final JaxbHbmColumnType column = new JaxbHbmColumnType(); + column.setName(config.type.name); + config.type.columnDefinition.ifPresent(column::setSqlType); + config.type.length.ifPresent(column::setLength); + config.type.precision.ifPresent(column::setPrecision); + config.type.scale.ifPresent(column::setScale); + attribute.getColumnOrFormula().add(column); + + return attribute; + } + + private static JaxbHbmBasicAttributeType createTimestampAttribute(DebeziumOutboxConfig config) { + final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType(); + attribute.setName("timestamp"); + attribute.setNotNull(!config.timestamp.nullable); + if (config.timestamp.converter.isPresent()) { + attribute.setTypeAttribute("converted::" + config.timestamp.converter.get()); + } + else { + attribute.setTypeAttribute("Instant"); + } + + final JaxbHbmColumnType column = new JaxbHbmColumnType(); + column.setName(config.timestamp.name); + config.timestamp.columnDefinition.ifPresent(column::setSqlType); + config.timestamp.length.ifPresent(column::setLength); + config.timestamp.precision.ifPresent(column::setPrecision); + config.timestamp.scale.ifPresent(column::setScale); + attribute.getColumnOrFormula().add(column); + + return attribute; + } + + private static JaxbHbmBasicAttributeType createPayloadAttribute(DebeziumOutboxConfig config, + OutboxEventEntityBuildItem outboxEventEntityBuildItem) { + + final boolean isJacksonJsonNode = isPayloadJacksonJsonNode(outboxEventEntityBuildItem); + + final JaxbHbmBasicAttributeType attribute = new JaxbHbmBasicAttributeType(); + attribute.setName("payload"); + attribute.setNotNull(!config.payload.nullable); + + if (config.payload.converter.isPresent()) { + attribute.setTypeAttribute("converted::" + config.payload.converter.get()); + } + else if (isJacksonJsonNode) { + attribute.setTypeAttribute("converted::" + JsonNodeAttributeConverter.class.getName()); + } + else { + attribute.setTypeAttribute(outboxEventEntityBuildItem.getPayloadType().name().toString()); + } + + final JaxbHbmColumnType column = new JaxbHbmColumnType(); + column.setName(config.payload.name); + + if (config.payload.columnDefinition.isPresent()) { + column.setSqlType(config.payload.columnDefinition.get()); + } + else if (isJacksonJsonNode) { + column.setSqlType("varchar(8000)"); + } + + config.payload.length.ifPresent(column::setLength); + config.payload.precision.ifPresent(column::setPrecision); + config.payload.scale.ifPresent(column::setScale); + attribute.getColumnOrFormula().add(column); + + return attribute; + } + + private static boolean isPayloadJacksonJsonNode(OutboxEventEntityBuildItem outboxEventEntityBuildItem) { + return outboxEventEntityBuildItem.getPayloadType().name().toString().equals(JACKSON_JSONNODE); + } } diff --git a/documentation/modules/ROOT/pages/integrations/outbox.adoc b/documentation/modules/ROOT/pages/integrations/outbox.adoc index 938418430..f122a6ac2 100644 --- a/documentation/modules/ROOT/pages/integrations/outbox.adoc +++ b/documentation/modules/ROOT/pages/integrations/outbox.adoc @@ -137,35 +137,190 @@ The table name to be used when creating the outbox table. |OutboxEvent -|`quarkus.debezium-outbox.id-column-name`:: +|`quarkus.debezium-outbox.id.name`:: The column name for the event id column. |string |`id` -|`quarkus.debezium-outbox.aggregate-id-column.name`:: +|`quarkus.debezium-outbox.id.column-definition`:: +The database-specific column definition for the event id column. +|string +| + +|`quarkus.debezium-outbox.aggregate-id.name`:: The column name for the event key column. |string |`aggregateid` -|`quarkus.debezium-outbox.type-column-name`:: +|`quarkus.debezium-outbox.aggregate-id.column-definition`:: +The database-specific column definition for the aggregate id. +|string +| + +|`quarkus.debezium-outbox.aggregate-id.length`:: +The length of the event key column. +|integer +| + +|`quarkus.debezium-outbox.aggregate-id.scale`:: +The scale of the event key column. +|integer +| + +|`quarkus.debezium-outbox.aggregate-id.precision`:: +The precision of the event key column. +|integer +| + +|`quarkus.debezium-outbox.aggregate-id.converter`:: +The JPA AttributeConverter for the event key column. +|string +| + +|`quarkus.debezium-outbox.aggregate-id.nullable`:: +Whether the event key column is nullable. +|boolean +|false + +|`quarkus.debezium-outbox.aggregate-type-name`:: +The column name for the event aggregate type column. +|string +|`aggregatetype` + +|`quarkus.debezium-outbox.aggregate-type.column-definition`:: +The database-specific column definition for the aggregate type. +|string +| + +|`quarkus.debezium-outbox.aggregate-type.length`:: +The length of the event aggregate type column. +|integer +| + +|`quarkus.debezium-outbox.aggregate-type.scale`:: +The scale of the event aggregate type column. +|integer +| + +|`quarkus.debezium-outbox.aggregate-type.precision`:: +The precision of the event aggregate type column. +|integer +| + +|`quarkus.debezium-outbox.aggregate-type.converter`:: +The JPA AttributeConverter for the event aggregate type column. +|string +| + +|`quarkus.debezium-outbox.aggregate-type.nullable`:: +Whether the event aggregate type column is nullable. +|boolean +|false + +|`quarkus.debezium-outbox.type.name`:: The column name for the event type column. |string |`type` -|`quarkus.debezium-outbox.timestamp-column-name`:: +|`quarkus.debezium-outbox.type.column-definition`:: +The database-specific column definition for the event type. +|string +| + +|`quarkus.debezium-outbox.type.length`:: +The length of the event type column. +|integer +| + +|`quarkus.debezium-outbox.type.scale`:: +The scale of the event type column. +|integer +| + +|`quarkus.debezium-outbox.type.precision`:: +The precision of the event type column. +|integer +| + +|`quarkus.debezium-outbox.type.converter`:: +The JPA AttributeConverter for the event type column. +|string +| + +|`quarkus.debezium-outbox.type.nullable`:: +Whether the event type column is nullable. +|boolean +|false + +|`quarkus.debezium-outbox.timestamp.name`:: The column name for the event timestamp column. |string |`timestamp` -|`quarkus.debezium-outbox.payload-column-name`:: +|`quarkus.debezium-outbox.timestamp.column-definition`:: +The database-specific column definition for the event timestamp. +|string +| + +|`quarkus.debezium-outbox.timestamp.length`:: +The length of the event timestamp column. +|integer +| + +|`quarkus.debezium-outbox.timestamp.scale`:: +The scale of the event timestamp column. +|integer +| + +|`quarkus.debezium-outbox.timestamp.precision`:: +The precision of the event timestamp column. +|integer +| + +|`quarkus.debezium-outbox.timestamp.converter`:: +The JPA AttributeConverter for the event timestamp column. +|string +| + +|`quarkus.debezium-outbox.timestamp.nullable`:: +Whether the event timestamp column is nullable. +|boolean +|false + +|`quarkus.debezium-outbox.payload.name`:: The column name for the event payload column. |string |`payload` -|`quarkus.debezium-outbox.aggregate-type-column-name`:: -The column name for the event aggregate type column. +|`quarkus.debezium-outbox.payload.column-definition`:: +The database-specific column definition for the event payload. |string -|`aggregatetype` +| + +|`quarkus.debezium-outbox.payload.length`:: +The length of the event payload column. +|integer +| + +|`quarkus.debezium-outbox.payload.scale`:: +The scale of the event payload column. +|integer +| + +|`quarkus.debezium-outbox.payload.precision`:: +The precision of the event payload column. +|integer +| + +|`quarkus.debezium-outbox.payload.converter`:: +The JPA AttributeConverter for the event payload column. +|string +| + +|`quarkus.debezium-outbox.payload.nullable`:: +Whether the event payload column is nullable. +|boolean +|true |=======================