DBZ-1963 Suggested changes

This commit is contained in:
Chris Cranford 2020-05-04 14:44:35 -04:00 committed by Gunnar Morling
parent a1cb06299c
commit 81c3a808da
4 changed files with 33 additions and 58 deletions

View File

@ -132,8 +132,11 @@ public void shouldConsumeRecordsFromInsert() throws Exception {
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
assertThat(routedEvent.keySchema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(routedEvent.key()).isEqualTo("10711fa5");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(valueStruct.schema().name()).isEqualTo("test_server.outboxsmtit.outbox.Value-outbox");
assertThat(valueStruct.schema().name()).isEqualTo("test_server.outboxsmtit.outbox.User.Value");
assertThat(valueStruct.getString("eventType")).isEqualTo("UserCreated");
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email")).isEqualTo(null);
@ -236,7 +239,7 @@ public void shouldSupportAllFeatures() throws Exception {
// Validate metadata
Schema expectedSchema = SchemaBuilder.struct()
.version(1)
.name("test_server.outboxsmtit.outbox.Value-outbox")
.name("test_server.outboxsmtit.outbox.UserEmail.Value")
.field("payload", Json.builder().optional().build())
.field("eventType", Schema.STRING_SCHEMA)
.field("eventVersion", Schema.INT32_SCHEMA)
@ -473,37 +476,6 @@ public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
assertThat(eventRouted.value()).isNull();
}
@Test
@FixFor("DBZ-1963")
public void shouldEmitValueSchemaWithNonDefaultName() throws Exception {
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("debezium.schema.name.suffix", "-outbox-non-default");
outboxEventRouter.configure(config);
startConnectorWithInitialSnapshotRecord();
TestHelper.execute(createEventInsert(
UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"),
"UserCreated",
"User",
"10711fa5",
"{}",
""));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.User");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(valueStruct.schema().name()).isEqualTo("test_server.outboxsmtit.outbox.Value-outbox-non-default");
}
private void startConnectorWithInitialSnapshotRecord() throws Exception {
TestHelper.execute(createEventInsert(
UUID.fromString("70f52ae3-f671-4bac-ae62-1b9be6e73700"),

View File

@ -65,7 +65,6 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private List<AdditionalField> additionalFields;
private Schema defaultValueSchema;
private String valueSchemaSuffix;
private final Map<Integer, Schema> versionedValueSchema = new HashMap<>();
private boolean onlyHeadersInOutputMessage = false;
@ -126,8 +125,8 @@ public R apply(R r) {
final Schema structValueSchema = onlyHeadersInOutputMessage ? null
: (fieldSchemaVersion == null)
? getValueSchema(eventValueSchema)
: getValueSchema(eventValueSchema, eventStruct.getInt32(fieldSchemaVersion));
? getValueSchema(eventValueSchema, eventStruct.getString(routeByField))
: getValueSchema(eventValueSchema, eventStruct.getInt32(fieldSchemaVersion), eventStruct.getString(routeByField));
final Struct structValue = onlyHeadersInOutputMessage ? null : new Struct(structValueSchema).put(ENVELOPE_PAYLOAD, payload);
@ -268,7 +267,6 @@ public void configure(Map<String, ?> configMap) {
fieldSchemaVersion = config.getString(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION);
routeByField = config.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD);
routeTombstoneOnEmptyPayload = config.getBoolean(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD);
valueSchemaSuffix = config.getString(EventRouterConfigDefinition.SCHEMA_NAME_SUFFIX);
final Map<String, String> regexRouterConfig = new HashMap<>();
regexRouterConfig.put("regex", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX));
@ -285,17 +283,17 @@ public void configure(Map<String, ?> configMap) {
onlyHeadersInOutputMessage = !additionalFields.stream().anyMatch(field -> field.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE);
}
private Schema getValueSchema(Schema debeziumEventSchema) {
private Schema getValueSchema(Schema debeziumEventSchema, String routedTopic) {
if (defaultValueSchema == null) {
defaultValueSchema = getSchemaBuilder(debeziumEventSchema).build();
defaultValueSchema = getSchemaBuilder(debeziumEventSchema, routedTopic).build();
}
return defaultValueSchema;
}
private Schema getValueSchema(Schema debeziumEventSchema, Integer version) {
private Schema getValueSchema(Schema debeziumEventSchema, Integer version, String routedTopic) {
if (!versionedValueSchema.containsKey(version)) {
final Schema schema = getSchemaBuilder(debeziumEventSchema)
final Schema schema = getSchemaBuilder(debeziumEventSchema, routedTopic)
.version(version)
.build();
versionedValueSchema.put(version, schema);
@ -304,8 +302,8 @@ private Schema getValueSchema(Schema debeziumEventSchema, Integer version) {
return versionedValueSchema.get(version);
}
private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(debeziumEventSchema.name() + valueSchemaSuffix);
private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema, String routedTopic) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(getSchemaName(debeziumEventSchema, routedTopic));
// Add payload field
schemaBuilder
@ -322,4 +320,22 @@ private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema) {
return schemaBuilder;
}
private String getSchemaName(Schema debeziumEventSchema, String routedTopic) {
final String schemaName;
final String originalSchemaName = debeziumEventSchema.name();
if (originalSchemaName != null) {
final int lastDot = originalSchemaName.lastIndexOf('.');
if (lastDot != -1) {
schemaName = originalSchemaName.substring(0, lastDot + 1) + routedTopic + "." + originalSchemaName.substring(lastDot + 1);
}
else {
schemaName = routedTopic + "." + originalSchemaName;
}
}
else {
schemaName = routedTopic;
}
return schemaName;
}
}

View File

@ -226,13 +226,6 @@ public String getAlias() {
" in case something else is processed this transform can log it as warning, error or stop the" +
" process");
static final Field SCHEMA_NAME_SUFFIX = Field.create("debezium.schema.name.suffix")
.withDisplayName("The suffix to append to value schema")
.withDefault("-outbox")
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("A suffix that is appended to the source message value schema for the emitted outbox message");
static final Field[] CONFIG_FIELDS = {
FIELD_EVENT_ID,
FIELD_EVENT_KEY,
@ -246,8 +239,7 @@ public String getAlias() {
ROUTE_TOPIC_REGEX,
ROUTE_TOPIC_REPLACEMENT,
ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD,
OPERATION_INVALID_BEHAVIOR,
SCHEMA_NAME_SUFFIX
OPERATION_INVALID_BEHAVIOR
};
/**
@ -272,7 +264,7 @@ public static ConfigDef configDef() {
Field.group(
config,
"Debezium",
OPERATION_INVALID_BEHAVIOR, SCHEMA_NAME_SUFFIX);
OPERATION_INVALID_BEHAVIOR);
return config;
}

View File

@ -156,11 +156,6 @@ In such case the field could be placed either in envelope or among other headers
|`warn`
|Debezium
|While Debezium is monitoring the table, it's not expecting to see 'update' row events, in case it happens, this transform can log it as warning, error or stop the process. Options are `warn`, `error` and `fatal`
|`debezium.schema.name.suffix`
|`-outbox`
|Debezium
|Specifies the suffix to append to the original message's value schema name for the emitted outbox message.
|=======================
=== Default table columns