DBZ-1478 Dynamic mapping and configuration support & cleaned up code
This commit is contained in:
parent
dace9da957
commit
d7208791bd
@ -32,11 +32,11 @@
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-hibernate-validator-deployment</artifactId>
|
||||
<artifactId>quarkus-jackson-deployment</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-jackson-deployment</artifactId>
|
||||
<artifactId>quarkus-resteasy-deployment</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
|
@ -5,6 +5,8 @@
|
||||
*/
|
||||
package io.debezium.quarkus.outbox;
|
||||
|
||||
import io.quarkus.runtime.annotations.ConfigItem;
|
||||
import io.quarkus.runtime.annotations.ConfigPhase;
|
||||
import io.quarkus.runtime.annotations.ConfigRoot;
|
||||
|
||||
/**
|
||||
@ -14,7 +16,47 @@
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
@ConfigRoot
|
||||
@ConfigRoot(phase = ConfigPhase.BUILD_TIME)
|
||||
public class DebeziumOutboxConfig {
|
||||
/**
|
||||
* The table name to be used for the outbox table
|
||||
*/
|
||||
@ConfigItem(defaultValue = "OutboxEvent")
|
||||
public String tableName;
|
||||
|
||||
/**
|
||||
* The column name that contains the event id in the outbox table
|
||||
*/
|
||||
@ConfigItem(defaultValue = "id")
|
||||
public String columnIdName;
|
||||
|
||||
/**
|
||||
* The column name that contains the event key within the outbox table
|
||||
*/
|
||||
@ConfigItem(defaultValue = "aggregateid")
|
||||
public String aggregateIdName;
|
||||
|
||||
/**
|
||||
* The column name that contains the event type in the outbox table
|
||||
*/
|
||||
@ConfigItem(defaultValue = "type")
|
||||
public String typeName;
|
||||
|
||||
/**
|
||||
* The column name that contains the timestamp in the outbox table
|
||||
*/
|
||||
@ConfigItem(defaultValue = "timestamp")
|
||||
public String timestampName;
|
||||
|
||||
/**
|
||||
* The column name that contains the event payload in the outbox table
|
||||
*/
|
||||
@ConfigItem(defaultValue = "payload")
|
||||
public String payloadName;
|
||||
|
||||
/**
|
||||
* The column name that determines how the events will be routed in the outbox table
|
||||
*/
|
||||
@ConfigItem(defaultValue = "aggregatetype")
|
||||
public String aggregateTypeName;
|
||||
}
|
||||
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.quarkus.outbox;
|
||||
|
||||
import org.jboss.jandex.Type;
|
||||
|
||||
import io.quarkus.builder.item.SimpleBuildItem;
|
||||
|
||||
/**
|
||||
* A build item that represents data relevant to the OutboxEvent entity mapping.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public final class OutboxEventEntityBuildItem extends SimpleBuildItem {
|
||||
private final Type aggregateIdType;
|
||||
private final Type payloadType;
|
||||
|
||||
public OutboxEventEntityBuildItem(Type aggregateIdType, Type payloadType) {
|
||||
this.aggregateIdType = aggregateIdType;
|
||||
this.payloadType = payloadType;
|
||||
}
|
||||
|
||||
public Type getAggregateIdType() {
|
||||
return aggregateIdType;
|
||||
}
|
||||
|
||||
public Type getPayloadType() {
|
||||
return payloadType;
|
||||
}
|
||||
}
|
@ -0,0 +1,109 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.quarkus.outbox;
|
||||
|
||||
import static io.debezium.outbox.quarkus.internal.OutboxConstants.OUTBOX_ENTITY_FULLNAME;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmBasicAttributeType;
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmColumnType;
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmGeneratorSpecificationType;
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmHibernateMapping;
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmIdentifierGeneratorDefinitionType;
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmRootEntityType;
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmSimpleIdType;
|
||||
|
||||
import io.debezium.outbox.quarkus.internal.JsonNodeAttributeConverter;
|
||||
|
||||
/**
|
||||
* Helper class that can produce a JAXB HBM mapping for the OutboxEvent entity.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class OutboxEventHbmWriter {
|
||||
|
||||
static JaxbHbmHibernateMapping write(DebeziumOutboxConfig config, OutboxEventEntityBuildItem outboxEventEntityBuildItem) {
|
||||
final JaxbHbmHibernateMapping mapping = new JaxbHbmHibernateMapping();
|
||||
|
||||
final JaxbHbmRootEntityType entityType = new JaxbHbmRootEntityType();
|
||||
entityType.setEntityName(OUTBOX_ENTITY_FULLNAME);
|
||||
entityType.setTable(config.tableName);
|
||||
mapping.getClazz().add(entityType);
|
||||
|
||||
// Setup generator
|
||||
final JaxbHbmIdentifierGeneratorDefinitionType generatorType = new JaxbHbmIdentifierGeneratorDefinitionType();
|
||||
generatorType.setName("uuid2");
|
||||
generatorType.setClazz("uuid2");
|
||||
mapping.getIdentifierGenerator().add(generatorType);
|
||||
|
||||
// Setup the ID
|
||||
final JaxbHbmSimpleIdType idType = new JaxbHbmSimpleIdType();
|
||||
idType.setName("id");
|
||||
idType.setColumnAttribute(config.columnIdName);
|
||||
idType.setTypeAttribute(UUID.class.getName());
|
||||
|
||||
final JaxbHbmGeneratorSpecificationType generatorSpecType = new JaxbHbmGeneratorSpecificationType();
|
||||
generatorSpecType.setClazz("uuid2");
|
||||
idType.setGenerator(generatorSpecType);
|
||||
|
||||
entityType.setId(idType);
|
||||
|
||||
// Setup the aggregateType
|
||||
final JaxbHbmBasicAttributeType aggregateType = new JaxbHbmBasicAttributeType();
|
||||
aggregateType.setName("aggregateType");
|
||||
aggregateType.setColumnAttribute(config.aggregateTypeName);
|
||||
aggregateType.setTypeAttribute("string");
|
||||
aggregateType.setNotNull(true);
|
||||
entityType.getAttributes().add(aggregateType);
|
||||
|
||||
// Setup the aggregateIdType
|
||||
final JaxbHbmBasicAttributeType aggregateIdType = new JaxbHbmBasicAttributeType();
|
||||
aggregateIdType.setName("aggregateId");
|
||||
aggregateIdType.setColumnAttribute(config.aggregateIdName);
|
||||
aggregateIdType.setTypeAttribute(outboxEventEntityBuildItem.getAggregateIdType().name().toString());
|
||||
aggregateIdType.setNotNull(true);
|
||||
entityType.getAttributes().add(aggregateIdType);
|
||||
|
||||
// Setup the typeType
|
||||
final JaxbHbmBasicAttributeType typeType = new JaxbHbmBasicAttributeType();
|
||||
typeType.setName("type");
|
||||
typeType.setColumnAttribute(config.typeName);
|
||||
typeType.setTypeAttribute("string");
|
||||
typeType.setNotNull(true);
|
||||
entityType.getAttributes().add(typeType);
|
||||
|
||||
// Setup the timestampType
|
||||
final JaxbHbmBasicAttributeType timestampType = new JaxbHbmBasicAttributeType();
|
||||
timestampType.setName("timestamp");
|
||||
timestampType.setColumnAttribute(config.timestampName);
|
||||
timestampType.setTypeAttribute("Instant");
|
||||
timestampType.setNotNull(true);
|
||||
entityType.getAttributes().add(timestampType);
|
||||
|
||||
// Setup the payloadType
|
||||
final JaxbHbmBasicAttributeType payloadType = new JaxbHbmBasicAttributeType();
|
||||
payloadType.setName("payload");
|
||||
|
||||
// todo: this needs some more testing with varied data types
|
||||
final String payloadClassType = outboxEventEntityBuildItem.getPayloadType().name().toString();
|
||||
if (payloadClassType.equals("com.fasterxml.jackson.databind.JsonNode")) {
|
||||
payloadType.setTypeAttribute("converted::" + JsonNodeAttributeConverter.class.getName());
|
||||
|
||||
final JaxbHbmColumnType columnType = new JaxbHbmColumnType();
|
||||
columnType.setName(config.payloadName);
|
||||
columnType.setSqlType("varchar(8000)");
|
||||
payloadType.getColumnOrFormula().add(columnType);
|
||||
}
|
||||
else {
|
||||
payloadType.setColumnAttribute(config.payloadName);
|
||||
payloadType.setTypeAttribute(outboxEventEntityBuildItem.getPayloadType().name().toString());
|
||||
}
|
||||
entityType.getAttributes().add(payloadType);
|
||||
|
||||
return mapping;
|
||||
}
|
||||
}
|
@ -5,21 +5,48 @@
|
||||
*/
|
||||
package io.debezium.quarkus.outbox;
|
||||
|
||||
import static io.debezium.outbox.quarkus.internal.OutboxConstants.OUTBOX_ENTITY_HBMXML;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import javax.xml.bind.JAXBContext;
|
||||
import javax.xml.bind.JAXBException;
|
||||
import javax.xml.bind.Marshaller;
|
||||
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmHibernateMapping;
|
||||
import org.jboss.jandex.ClassInfo;
|
||||
import org.jboss.jandex.DotName;
|
||||
import org.jboss.jandex.ParameterizedType;
|
||||
import org.jboss.jandex.Type;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
import io.debezium.outbox.quarkus.ExportedEvent;
|
||||
import io.debezium.outbox.quarkus.internal.DebeziumOutboxRecorder;
|
||||
import io.debezium.outbox.quarkus.internal.DebeziumOutboxRuntimeConfig;
|
||||
import io.debezium.outbox.quarkus.internal.EventDispatcher;
|
||||
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
|
||||
import io.quarkus.deployment.annotations.BuildProducer;
|
||||
import io.quarkus.deployment.annotations.BuildStep;
|
||||
import io.quarkus.deployment.annotations.ExecutionTime;
|
||||
import io.quarkus.deployment.annotations.Record;
|
||||
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
|
||||
import io.quarkus.deployment.builditem.FeatureBuildItem;
|
||||
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
|
||||
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
|
||||
|
||||
/**
|
||||
* Quarkus deployment processor for the Debezium "outbox" extension.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class OutboxProcessor {
|
||||
public final class OutboxProcessor {
|
||||
|
||||
private static final Logger LOGGER = Logger.getLogger(OutboxProcessor.class);
|
||||
|
||||
private static final String DEBEZIUM_OUTBOX = "debezium-outbox";
|
||||
private static final String DEBEZIUM_OUTBOX_CONFIG_PREFIX = "quarkus.debezium-outbox.";
|
||||
|
||||
/**
|
||||
* Debezium Outbox configuration
|
||||
@ -27,17 +54,112 @@ public class OutboxProcessor {
|
||||
DebeziumOutboxConfig debeziumOutboxConfig;
|
||||
|
||||
@BuildStep
|
||||
FeatureBuildItem feature() {
|
||||
public FeatureBuildItem feature() {
|
||||
return new FeatureBuildItem(DEBEZIUM_OUTBOX);
|
||||
}
|
||||
|
||||
@BuildStep
|
||||
void registerForReflection(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
|
||||
reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, true, ExportedEvent.class.getName()));
|
||||
public void produceOutboxBuildItem(CombinedIndexBuildItem index,
|
||||
BuildProducer<OutboxEventEntityBuildItem> outboxEventEntityProducer) {
|
||||
final DotName exportedEvent = DotName.createSimple(ExportedEvent.class.getName());
|
||||
|
||||
Type aggregateIdType = Type.create(DotName.createSimple(String.class.getName()), Type.Kind.CLASS);
|
||||
Type payloadType = Type.create(DotName.createSimple(JsonNode.class.getName()), Type.Kind.CLASS);
|
||||
|
||||
boolean parameterizedTypesDetected = false;
|
||||
for (ClassInfo classInfo : index.getIndex().getAllKnownImplementors(exportedEvent)) {
|
||||
LOGGER.infof("Found ExportedEvent type: %s", classInfo.name());
|
||||
for (Type interfaceType : classInfo.interfaceTypes()) {
|
||||
if (interfaceType.name().equals(exportedEvent)) {
|
||||
if (interfaceType.kind().equals(Type.Kind.PARAMETERIZED_TYPE)) {
|
||||
final ParameterizedType pType = interfaceType.asParameterizedType();
|
||||
if (pType.arguments().size() != 2) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"Expected 2 parameterized types for class %s using interface ExportedEvent",
|
||||
classInfo.name()));
|
||||
}
|
||||
|
||||
final Type pTypeAggregateType = pType.arguments().get(0);
|
||||
final Type pTypePayloadType = pType.arguments().get(1);
|
||||
LOGGER.debug(" * Implements ExportedEvent with generic parameters:");
|
||||
LOGGER.debugf(" AggregateId: %s", pTypeAggregateType.name().toString());
|
||||
LOGGER.debugf(" Payload: %s", pTypePayloadType.name().toString());
|
||||
|
||||
if (parameterizedTypesDetected) {
|
||||
if (!pTypeAggregateType.equals(aggregateIdType)) {
|
||||
throw new IllegalStateException(
|
||||
String.format(
|
||||
"Class %s implements ExportedEvent and expected aggregate-id parameter type " +
|
||||
"to be %s but was %s. All ExportedEvent implementors must use the same parameter types.",
|
||||
classInfo.name(),
|
||||
aggregateIdType.name(),
|
||||
pTypeAggregateType.name()));
|
||||
}
|
||||
if (!pTypePayloadType.equals(payloadType)) {
|
||||
throw new IllegalStateException(
|
||||
String.format(
|
||||
"Class %s implements ExportedEvent and expected payload parameter type to be " +
|
||||
"%s but was %s. All ExportedEvent implementors must use the same parameter types.",
|
||||
classInfo.name(),
|
||||
payloadType.name(),
|
||||
pTypePayloadType.name()));
|
||||
}
|
||||
}
|
||||
else {
|
||||
aggregateIdType = pTypeAggregateType;
|
||||
payloadType = pTypePayloadType;
|
||||
parameterizedTypesDetected = true;
|
||||
}
|
||||
}
|
||||
else {
|
||||
LOGGER.debug(" * Implements ExportedEvent without parameters, using:");
|
||||
LOGGER.debugf(" AggregateId: %s", aggregateIdType.name().toString());
|
||||
LOGGER.debugf(" Payload: %s", payloadType.name().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.infof("Binding Aggregate Id as '%s'.", aggregateIdType.name().toString());
|
||||
LOGGER.infof("Binding Payload as '%s'.", payloadType.name().toString());
|
||||
|
||||
outboxEventEntityProducer.produce(new OutboxEventEntityBuildItem(aggregateIdType, payloadType));
|
||||
}
|
||||
|
||||
@BuildStep
|
||||
void build(BuildProducer<AdditionalBeanBuildItem> additionalBean) {
|
||||
additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(EventDispatcher.class));
|
||||
@Record(ExecutionTime.RUNTIME_INIT)
|
||||
void configureconfigureDebeziumOutbox(DebeziumOutboxRecorder recorder, DebeziumOutboxRuntimeConfig config) {
|
||||
recorder.configureRuntimeProperties(config);
|
||||
}
|
||||
|
||||
@BuildStep(loadsApplicationClasses = true)
|
||||
public void build(OutboxEventEntityBuildItem outboxBuildItem,
|
||||
BuildProducer<AdditionalBeanBuildItem> additionalBeanProducer,
|
||||
BuildProducer<GeneratedResourceBuildItem> generatedResourcesProducer) {
|
||||
additionalBeanProducer.produce(AdditionalBeanBuildItem.unremovableOf(EventDispatcher.class));
|
||||
generateHbmMapping(outboxBuildItem, generatedResourcesProducer);
|
||||
}
|
||||
|
||||
private void generateHbmMapping(OutboxEventEntityBuildItem outboxBuildItem,
|
||||
BuildProducer<GeneratedResourceBuildItem> generatedResourcesProducer) {
|
||||
try {
|
||||
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
||||
final JaxbHbmHibernateMapping jaxbMapping = OutboxEventHbmWriter.write(debeziumOutboxConfig, outboxBuildItem);
|
||||
|
||||
final JAXBContext context = JAXBContext.newInstance(JaxbHbmHibernateMapping.class);
|
||||
final Marshaller marshaller = context.createMarshaller();
|
||||
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
|
||||
|
||||
final PrintWriter writer = new PrintWriter(os);
|
||||
marshaller.marshal(jaxbMapping, writer);
|
||||
|
||||
LOGGER.debugf("Outbox entity HBM mapping:\n%s", new String(os.toByteArray()));
|
||||
generatedResourcesProducer.produce(new GeneratedResourceBuildItem(OUTBOX_ENTITY_HBMXML, os.toByteArray()));
|
||||
}
|
||||
}
|
||||
catch (JAXBException | IOException e) {
|
||||
throw new IllegalStateException("Failed to produce Outbox HBM mapping", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,8 +30,9 @@
|
||||
<artifactId>quarkus-hibernate-orm</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-hibernate-validator</artifactId>
|
||||
<groupId>jakarta.validation</groupId>
|
||||
<artifactId>jakarta.validation-api</artifactId>
|
||||
<version>${validation-api.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
|
@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.outbox.quarkus.internal;
|
||||
|
||||
import static io.debezium.outbox.quarkus.internal.OutboxConstants.OUTBOX_ENTITY_HBMXML;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Writer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.hibernate.boot.jaxb.Origin;
|
||||
import org.hibernate.boot.jaxb.SourceType;
|
||||
import org.hibernate.boot.jaxb.hbm.spi.JaxbHbmHibernateMapping;
|
||||
import org.hibernate.boot.jaxb.internal.MappingBinder;
|
||||
import org.hibernate.boot.jaxb.spi.Binding;
|
||||
import org.hibernate.boot.model.source.internal.hbm.MappingDocument;
|
||||
import org.hibernate.boot.spi.AdditionalJaxbMappingProducer;
|
||||
import org.hibernate.boot.spi.MetadataBuildingContext;
|
||||
import org.hibernate.boot.spi.MetadataImplementor;
|
||||
import org.jboss.jandex.IndexView;
|
||||
|
||||
/**
|
||||
* An {@link AdditionalJaxbMappingProducer} implementation that provides Hibernate ORM
|
||||
* with a HBM XML mapping for an map-mode entity configuration for the OutboxEvent
|
||||
* entity data type.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class AdditionalJaxbMappingProducerImpl implements AdditionalJaxbMappingProducer {
|
||||
@Override
|
||||
public Collection<MappingDocument> produceAdditionalMappings(MetadataImplementor metadata,
|
||||
IndexView jandexIndex,
|
||||
MappingBinder mappingBinder,
|
||||
MetadataBuildingContext buildingContext) {
|
||||
final Origin origin = new Origin(SourceType.FILE, OUTBOX_ENTITY_HBMXML);
|
||||
try {
|
||||
final InputStream stream = getClass().getResourceAsStream("/" + OUTBOX_ENTITY_HBMXML);
|
||||
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
final Writer writer = new BufferedWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8));
|
||||
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
writer.write(line);
|
||||
}
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
try (ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray())) {
|
||||
try (BufferedInputStream bis = new BufferedInputStream(bais)) {
|
||||
final Binding<?> jaxbBinding = mappingBinder.bind(bis, origin);
|
||||
final JaxbHbmHibernateMapping mapping = (JaxbHbmHibernateMapping) jaxbBinding.getRoot();
|
||||
return Collections.singletonList(new MappingDocument(mapping, origin, buildingContext));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Failed to submit OutboxEvent.hbm.xml mapping to Hibernate ORM", e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.outbox.quarkus.internal;
|
||||
|
||||
import io.quarkus.arc.Arc;
|
||||
import io.quarkus.runtime.annotations.Recorder;
|
||||
|
||||
/**
|
||||
* Recorder that configures the {@link EventDispatcher} at runtime.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
@Recorder
|
||||
public class DebeziumOutboxRecorder {
|
||||
public void configureRuntimeProperties(DebeziumOutboxRuntimeConfig outboxRuntimeConfig) {
|
||||
EventDispatcher dispatcher = Arc.container().instance(EventDispatcher.class).get();
|
||||
dispatcher.setOutboxRuntimeProperties(outboxRuntimeConfig);
|
||||
}
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.outbox.quarkus.internal;
|
||||
|
||||
import io.quarkus.runtime.annotations.ConfigItem;
|
||||
import io.quarkus.runtime.annotations.ConfigPhase;
|
||||
import io.quarkus.runtime.annotations.ConfigRoot;
|
||||
|
||||
/**
|
||||
* Debezium outbox Quarkus extension runtime configuration properties.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
@ConfigRoot(phase = ConfigPhase.RUN_TIME, name = "debezium-outbox")
|
||||
public class DebeziumOutboxRuntimeConfig {
|
||||
/**
|
||||
* Remove outbox entity after being inserted. Default is {@code true}.
|
||||
*/
|
||||
@ConfigItem(defaultValue = "true")
|
||||
public Boolean removeAfterInsert;
|
||||
}
|
@ -5,16 +5,19 @@
|
||||
*/
|
||||
package io.debezium.outbox.quarkus.internal;
|
||||
|
||||
import static io.debezium.outbox.quarkus.internal.OutboxConstants.OUTBOX_ENTITY_FULLNAME;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.enterprise.event.Observes;
|
||||
import javax.inject.Inject;
|
||||
import javax.persistence.EntityManager;
|
||||
|
||||
import org.hibernate.Session;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
import io.debezium.outbox.quarkus.ExportedEvent;
|
||||
|
||||
/**
|
||||
@ -32,7 +35,18 @@ public class EventDispatcher {
|
||||
@Inject
|
||||
EntityManager entityManager;
|
||||
|
||||
public EventDispatcher() {
|
||||
/**
|
||||
* Debezium runtime configuration
|
||||
*/
|
||||
DebeziumOutboxRuntimeConfig config;
|
||||
|
||||
/**
|
||||
* Set the runtime configuration properties.
|
||||
*
|
||||
* @param outboxRuntimeProperties the configuration properties
|
||||
*/
|
||||
public void setOutboxRuntimeProperties(DebeziumOutboxRuntimeConfig outboxRuntimeProperties) {
|
||||
config = outboxRuntimeProperties;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -43,17 +57,21 @@ public EventDispatcher() {
|
||||
public void onExportedEvent(@Observes ExportedEvent<?, ?> event) {
|
||||
LOGGER.debug("An exported event was found for type {}", event.getType());
|
||||
|
||||
// Create an OutboxEvent object based on the ExportedEvent interface
|
||||
final OutboxEvent outboxEvent = new OutboxEvent(
|
||||
event.getAggregateType(),
|
||||
(String) event.getAggregateId(),
|
||||
event.getType(),
|
||||
(JsonNode) event.getPayload(),
|
||||
event.getTimestamp());
|
||||
// Define the entity map-mode object using property names and values
|
||||
final HashMap<String, Object> dataMap = new HashMap<>();
|
||||
dataMap.put("aggregateType", event.getAggregateType());
|
||||
dataMap.put("aggregateId", event.getAggregateId());
|
||||
dataMap.put("type", event.getType());
|
||||
dataMap.put("payload", event.getPayload());
|
||||
dataMap.put("timestamp", event.getTimestamp());
|
||||
|
||||
// We want the events table to remain empty; however this triggers both an INSERT and DELETE
|
||||
// in the database transaction log which is sufficient for Debezium to process the event.
|
||||
entityManager.persist(outboxEvent);
|
||||
entityManager.remove(outboxEvent);
|
||||
// Unwrap to Hibernate session and save
|
||||
Session session = entityManager.unwrap(Session.class);
|
||||
session.save(OUTBOX_ENTITY_FULLNAME, dataMap);
|
||||
|
||||
// Remove entity if the configuration deems doing so, leaving useful for debugging
|
||||
if (config.removeAfterInsert) {
|
||||
session.delete(OUTBOX_ENTITY_FULLNAME, dataMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,8 @@
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
/**
|
||||
* Converter that knows how to convert between a {@link String} and {@link JsonNode}.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class JsonNodeAttributeConverter implements AttributeConverter<JsonNode, String> {
|
||||
|
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.outbox.quarkus.internal;
|
||||
|
||||
/**
|
||||
* Constant values used by the Debezium outbox Quarkus extension.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public final class OutboxConstants {
|
||||
|
||||
public static final String OUTBOX_ENTITY_HBMXML = "META-INF/OutboxEvent.hbm.xml";
|
||||
public static final String OUTBOX_ENTITY_FULLNAME = "io.debezium.outbox.quarkus.internal.OutboxEvent";
|
||||
|
||||
private OutboxConstants() {
|
||||
}
|
||||
}
|
@ -1,114 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.outbox.quarkus.internal;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.UUID;
|
||||
|
||||
import javax.persistence.Column;
|
||||
import javax.persistence.Convert;
|
||||
import javax.persistence.Entity;
|
||||
import javax.persistence.GeneratedValue;
|
||||
import javax.persistence.Id;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
import io.debezium.outbox.quarkus.ExportedEvent;
|
||||
|
||||
/**
|
||||
* The outbox event entity.
|
||||
*
|
||||
* The contents of the {@link ExportedEvent} will be replicated to this entity definition and persisted to
|
||||
* the database in order for Debezium to capture the event.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
@Entity
|
||||
public class OutboxEvent {
|
||||
@Id
|
||||
@GeneratedValue
|
||||
private UUID id;
|
||||
|
||||
@NotNull
|
||||
private String aggregateType;
|
||||
|
||||
@NotNull
|
||||
private String aggregateId;
|
||||
|
||||
@NotNull
|
||||
private String type;
|
||||
|
||||
@NotNull
|
||||
private Instant timestamp;
|
||||
|
||||
// todo: for now mapping this as varchar(8000).
|
||||
// Need to investigate if we can write our own hibernate type implementation that does not
|
||||
// cause the DeletedElementException when building a native image.
|
||||
@NotNull
|
||||
@Column(columnDefinition = "varchar(8000)")
|
||||
@Convert(converter = JsonNodeAttributeConverter.class)
|
||||
private JsonNode payload;
|
||||
|
||||
OutboxEvent() {
|
||||
}
|
||||
|
||||
public OutboxEvent(String aggregateType, String aggregateId, String type, JsonNode payload, Instant timestamp) {
|
||||
this.aggregateType = aggregateType;
|
||||
this.aggregateId = aggregateId;
|
||||
this.type = type;
|
||||
this.payload = payload;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public UUID getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(UUID id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getAggregateType() {
|
||||
return aggregateType;
|
||||
}
|
||||
|
||||
public void setAggregateType(String aggregateType) {
|
||||
this.aggregateType = aggregateType;
|
||||
}
|
||||
|
||||
public String getAggregateId() {
|
||||
return aggregateId;
|
||||
}
|
||||
|
||||
public void setAggregateId(String aggregateId) {
|
||||
this.aggregateId = aggregateId;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public Instant getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void setTimestamp(Instant timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
public JsonNode getPayload() {
|
||||
return payload;
|
||||
}
|
||||
|
||||
public void setPayload(JsonNode payload) {
|
||||
this.payload = payload;
|
||||
}
|
||||
}
|
@ -0,0 +1 @@
|
||||
io.debezium.outbox.quarkus.internal.AdditionalJaxbMappingProducerImpl
|
@ -14,22 +14,16 @@
|
||||
|
||||
<name>Debezium Quarkus</name>
|
||||
<description>Debezium extensions for Quarkus</description>
|
||||
<url>http://debezium.io</url>
|
||||
<inceptionYear>2019</inceptionYear>
|
||||
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
|
||||
<quarkus.version>1.1.0.Final</quarkus.version>
|
||||
<validation-api.version>2.0.2</validation-api.version>
|
||||
|
||||
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
|
||||
<maven.compiler.target>1.8</maven.compiler.target>
|
||||
<maven.compiler.source>1.8</maven.compiler.source>
|
||||
<maven.compiler.testTarget>${maven.compiler.target}</maven.compiler.testTarget>
|
||||
<maven.compiler.testSource>${maven.compiler.source}</maven.compiler.testSource>
|
||||
|
||||
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
@ -38,7 +32,7 @@
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${maven-compiler-plugin.version}</version>
|
||||
<version>${version.compiler.plugin}</version>
|
||||
<configuration>
|
||||
<showDeprecation>true</showDeprecation>
|
||||
<showWarnings>true</showWarnings>
|
||||
@ -51,7 +45,7 @@
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${maven-surefire-plugin.version}</version>
|
||||
<version>${version.surefire.plugin}</version>
|
||||
<configuration>
|
||||
<failIfNoTests>false</failIfNoTests>
|
||||
<systemProperties>
|
||||
@ -63,7 +57,7 @@
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<version>${maven-surefire-plugin.version}</version>
|
||||
<version>${version.surefire.plugin}</version>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
|
Loading…
Reference in New Issue
Block a user