DBZ-1169 Support user set Schema version

The cache mechanism had to be adapted in order to support non-versioned
and versioned schemas, a test now confirms it's the same valueSchema
instance created once.
This commit is contained in:
Renato Mefi 2019-03-24 15:41:23 +01:00 committed by Jiri Pechanec
parent 4aec4d5e92
commit 87a3278dfe
3 changed files with 98 additions and 7 deletions

View File

@ -49,12 +49,15 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private String fieldEventTimestamp; private String fieldEventTimestamp;
private String fieldPayload; private String fieldPayload;
private String fieldPayloadId; private String fieldPayloadId;
private String fieldSchemaVersion;
private String routeByField; private String routeByField;
private Schema valueSchema;
private List<AdditionalField> additionalFields; private List<AdditionalField> additionalFields;
private Schema defaultValueSchema;
private final Map<Integer, Schema> versionedValueSchema = new HashMap<>();
@Override @Override
public R apply(R r) { public R apply(R r) {
// Ignoring tombstones // Ignoring tombstones
@ -94,9 +97,9 @@ public R apply(R r) {
Headers headers = r.headers(); Headers headers = r.headers();
headers.add("id", eventId, eventValueSchema.field(fieldEventId).schema()); headers.add("id", eventId, eventValueSchema.field(fieldEventId).schema());
if (valueSchema == null) { Schema valueSchema = (fieldSchemaVersion == null)
valueSchema = buildValueSchema(eventValueSchema); ? getValueSchema(eventValueSchema)
} : getValueSchema(eventValueSchema, eventStruct.getInt32(fieldSchemaVersion));
Struct value = new Struct(valueSchema) Struct value = new Struct(valueSchema)
.put(ENVELOPE_EVENT_TYPE, eventType) .put(ENVELOPE_EVENT_TYPE, eventType)
@ -179,6 +182,7 @@ public void configure(Map<String, ?> configMap) {
fieldEventTimestamp = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP); fieldEventTimestamp = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);
fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD); fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD);
fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID); fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID);
fieldSchemaVersion = config.getString(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION);
routeByField = config.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD); routeByField = config.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD);
@ -196,7 +200,26 @@ public void configure(Map<String, ?> configMap) {
additionalFields = parseAdditionalFieldsConfig(config); additionalFields = parseAdditionalFieldsConfig(config);
} }
private Schema buildValueSchema(Schema debeziumEventSchema) { private Schema getValueSchema(Schema debeziumEventSchema) {
if (defaultValueSchema == null) {
defaultValueSchema = getSchemaBuilder(debeziumEventSchema).build();
}
return defaultValueSchema;
}
private Schema getValueSchema(Schema debeziumEventSchema, Integer version) {
if (!versionedValueSchema.containsKey(version)) {
final Schema schema = getSchemaBuilder(debeziumEventSchema)
.version(version)
.build();
versionedValueSchema.put(version, schema);
}
return versionedValueSchema.get(version);
}
private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct(); SchemaBuilder schemaBuilder = SchemaBuilder.struct();
// Add default fields // Add default fields
@ -214,6 +237,6 @@ private Schema buildValueSchema(Schema debeziumEventSchema) {
} }
})); }));
return schemaBuilder.build(); return schemaBuilder;
} }
} }

View File

@ -190,6 +190,13 @@ public String getAlias() {
" is a list of colon-delimited pairs or trios when you desire to have aliases," + " is a list of colon-delimited pairs or trios when you desire to have aliases," +
" e.g. <code>id:header,field_name:envelope:alias</code> "); " e.g. <code>id:header,field_name:envelope:alias</code> ");
static final Field FIELD_SCHEMA_VERSION = Field.create("table.field.schema.version")
.withDisplayName("Event Schema Version Field")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("The column which contains the Schema version within the outbox table");
static final Field ROUTE_BY_FIELD = Field.create("route.by.field") static final Field ROUTE_BY_FIELD = Field.create("route.by.field")
.withDisplayName("Field to route events by") .withDisplayName("Field to route events by")
.withType(ConfigDef.Type.STRING) .withType(ConfigDef.Type.STRING)
@ -241,7 +248,7 @@ public static ConfigDef configDef() {
Field.group( Field.group(
config, config,
"Table", "Table",
FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_EVENT_TIMESTAMP, FIELDS_ADDITIONAL_PLACEMENT FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_EVENT_TIMESTAMP, FIELDS_ADDITIONAL_PLACEMENT, FIELD_SCHEMA_VERSION
); );
Field.group( Field.group(
config, config,

View File

@ -79,6 +79,8 @@ public void canExtractTableFields() {
assertThat(eventRouted).isNotNull(); assertThat(eventRouted).isNotNull();
assertThat(((Struct) eventRouted.value()).getString("payload")).isEqualTo("{}"); assertThat(((Struct) eventRouted.value()).getString("payload")).isEqualTo("{}");
assertThat(eventRouted.valueSchema().version()).isNull();
} }
@Test @Test
@ -315,6 +317,65 @@ public void canInfluenceTableColumnTypes() {
assertThat(header.value()).isEqualTo(2); assertThat(header.value()).isEqualTo(2);
} }
@Test
public void canSetSchemaVersion() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
router.configure(config);
Map<String, Schema> extraFields = new HashMap<>();
extraFields.put("version", Schema.INT32_SCHEMA);
Map<String, Object> extraValuesV1 = new HashMap<>();
extraValuesV1.put("version", 1);
final SourceRecord eventRecordV1 = createEventRecord(
"166080d9-3b0e-4a04-81fe-2058a7386f1f",
"UserCreated",
"420b186d",
"User",
"{}",
extraFields,
extraValuesV1
);
final SourceRecord eventRoutedV1 = router.apply(eventRecordV1);
assertThat(eventRoutedV1.valueSchema().version()).isEqualTo(1);
Map<String, Object> extraValuesV3 = new HashMap<>();
extraValuesV3.put("version", 3);
final SourceRecord eventRecordV3 = createEventRecord(
"166080d9-3b0e-4a04-81fe-2058a7386f1f",
"UserCreated",
"420b186d",
"User",
"{}",
extraFields,
extraValuesV3
);
final SourceRecord eventRoutedV3 = router.apply(eventRecordV3);
assertThat(eventRoutedV3.valueSchema().version()).isEqualTo(3);
// This one will now use the cached version
final SourceRecord eventRecordV1E2 = createEventRecord(
"18f94a39-b931-41b7-837c-6fc23b013597",
"UserCreated",
"1b10b70b",
"User",
"{}",
extraFields,
extraValuesV1
);
final SourceRecord eventRoutedV1E2 = router.apply(eventRecordV1E2);
assertThat(eventRoutedV1E2.valueSchema().version()).isEqualTo(1);
assertThat(eventRoutedV1.valueSchema()).isSameAs(eventRoutedV1E2.valueSchema());
}
@Test @Test
public void canSetPayloadTypeIntoTheEnvelope() { public void canSetPayloadTypeIntoTheEnvelope() {
final EventRouter<SourceRecord> router = new EventRouter<>(); final EventRouter<SourceRecord> router = new EventRouter<>();