DBZ-1963 Emit outbox events with value schema names

This commit is contained in:
Chris Cranford 2020-04-30 18:15:43 -04:00 committed by Gunnar Morling
parent 7e14a351d9
commit a1cb06299c
4 changed files with 51 additions and 3 deletions

View File

@ -133,6 +133,7 @@ public void shouldConsumeRecordsFromInsert() throws Exception {
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(valueStruct.schema().name()).isEqualTo("test_server.outboxsmtit.outbox.Value-outbox");
assertThat(valueStruct.getString("eventType")).isEqualTo("UserCreated");
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email")).isEqualTo(null);
@ -235,6 +236,7 @@ public void shouldSupportAllFeatures() throws Exception {
// Validate metadata
Schema expectedSchema = SchemaBuilder.struct()
.version(1)
.name("test_server.outboxsmtit.outbox.Value-outbox")
.field("payload", Json.builder().optional().build())
.field("eventType", Schema.STRING_SCHEMA)
.field("eventVersion", Schema.INT32_SCHEMA)
@ -471,6 +473,37 @@ public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
assertThat(eventRouted.value()).isNull();
}
@Test
@FixFor("DBZ-1963")
public void shouldEmitValueSchemaWithNonDefaultName() throws Exception {
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("debezium.schema.name.suffix", "-outbox-non-default");
outboxEventRouter.configure(config);
startConnectorWithInitialSnapshotRecord();
TestHelper.execute(createEventInsert(
UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"),
"UserCreated",
"User",
"10711fa5",
"{}",
""));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(valueStruct.schema().name()).isEqualTo("test_server.outboxsmtit.outbox.Value-outbox-non-default");
}
private void startConnectorWithInitialSnapshotRecord() throws Exception {
TestHelper.execute(createEventInsert(
UUID.fromString("70f52ae3-f671-4bac-ae62-1b9be6e73700"),

View File

@ -65,6 +65,7 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private List<AdditionalField> additionalFields;
private Schema defaultValueSchema;
private String valueSchemaSuffix;
private final Map<Integer, Schema> versionedValueSchema = new HashMap<>();
private boolean onlyHeadersInOutputMessage = false;
@ -267,6 +268,7 @@ public void configure(Map<String, ?> configMap) {
fieldSchemaVersion = config.getString(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION);
routeByField = config.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD);
routeTombstoneOnEmptyPayload = config.getBoolean(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD);
valueSchemaSuffix = config.getString(EventRouterConfigDefinition.SCHEMA_NAME_SUFFIX);
final Map<String, String> regexRouterConfig = new HashMap<>();
regexRouterConfig.put("regex", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX));
@ -303,7 +305,7 @@ private Schema getValueSchema(Schema debeziumEventSchema, Integer version) {
}
private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(debeziumEventSchema.name() + valueSchemaSuffix);
// Add payload field
schemaBuilder

View File

@ -226,6 +226,13 @@ public String getAlias() {
" in case something else is processed this transform can log it as warning, error or stop the" +
" process");
static final Field SCHEMA_NAME_SUFFIX = Field.create("debezium.schema.name.suffix")
.withDisplayName("The suffix to append to value schema")
.withDefault("-outbox")
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("A suffix that is appended to the source message value schema for the emitted outbox message");
static final Field[] CONFIG_FIELDS = {
FIELD_EVENT_ID,
FIELD_EVENT_KEY,
@ -239,7 +246,8 @@ public String getAlias() {
ROUTE_TOPIC_REGEX,
ROUTE_TOPIC_REPLACEMENT,
ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD,
OPERATION_INVALID_BEHAVIOR
OPERATION_INVALID_BEHAVIOR,
SCHEMA_NAME_SUFFIX
};
/**
@ -264,7 +272,7 @@ public static ConfigDef configDef() {
Field.group(
config,
"Debezium",
OPERATION_INVALID_BEHAVIOR);
OPERATION_INVALID_BEHAVIOR, SCHEMA_NAME_SUFFIX);
return config;
}

View File

@ -156,6 +156,11 @@ In such case the field could be placed either in envelope or among other headers
|`warn`
|Debezium
|While Debezium is monitoring the table, it's not expecting to see 'update' row events, in case it happens, this transform can log it as warning, error or stop the process. Options are `warn`, `error` and `fatal`
|`debezium.schema.name.suffix`
|`-outbox`
|Debezium
|Specifies the suffix to append to the original message's value schema name for the emitted outbox message.
|=======================
=== Default table columns