DBZ-1751 Centralize envelope schema naming

This commit is contained in:
Jiri Pechanec 2020-02-04 15:23:44 +01:00 committed by Gunnar Morling
parent c5902c3c32
commit 77f9a61b95
9 changed files with 43 additions and 9 deletions

View File

@ -29,6 +29,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.mongodb.FieldSelector.FieldFilter;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.data.Envelope.Operation;
import io.debezium.data.Json;
@ -137,7 +138,7 @@ protected RecordsForCollection(CollectionId collectionId, FieldFilter fieldFilte
.field("id", Schema.STRING_SCHEMA)
.build();
this.valueSchema = SchemaBuilder.struct()
.name(adjuster.adjust(topicName + ".Envelope"))
.name(adjuster.adjust(Envelope.schemaName(topicName)))
.field(FieldName.AFTER, Json.builder().optional().build())
.field("patch", Json.builder().optional().build())
.field(FieldName.SOURCE, source.schema())

View File

@ -223,7 +223,7 @@ private R newRecord(R record, BsonDocument keyDocument, BsonDocument valueDocume
if (valueDocument.size() > 0) {
String newValueSchemaName = record.valueSchema().name();
if (newValueSchemaName.endsWith(".Envelope")) {
if (Envelope.isEnvelopeSchema(newValueSchemaName)) {
newValueSchemaName = newValueSchemaName.substring(0, newValueSchemaName.length() - 9);
}
SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().name(newValueSchemaName);

View File

@ -59,6 +59,7 @@
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.postgresql.data.Ltree;
import io.debezium.data.Bits;
import io.debezium.data.Envelope;
import io.debezium.data.Json;
import io.debezium.data.SchemaUtil;
import io.debezium.data.SpecialValueDecimal;
@ -936,7 +937,7 @@ protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean
assertNull("Last snapshot marker not expected, but found", lastSnapshotRecord);
}
final Struct envelope = (Struct) record.value();
if (envelope != null && envelope.schema().name().endsWith(".Envelope")) {
if (envelope != null && Envelope.isEnvelopeSchema(envelope.schema())) {
final Struct source = (Struct) envelope.get("source");
final SnapshotRecord sourceSnapshot = SnapshotRecord.fromSource(source);
if (shouldBeSnapshot) {

View File

@ -1451,7 +1451,7 @@ public void testEmptyChangesProducesHeartbeat() throws Exception {
// Expecting 1 data change
Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
final SourceRecord record = consumeRecord();
return record != null && record.valueSchema().name().endsWith(".Envelope");
return record != null && Envelope.isEnvelopeSchema(record.valueSchema());
});
// Expecting one empty DDL change

View File

@ -170,7 +170,7 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) {
if (schema == null || value == null) {
return null;
}
if (!schema.name().endsWith(".Envelope")) {
if (!Envelope.isEnvelopeSchema(schema)) {
// TODO Handling of non-data messages like schema change or transaction metadata
return null;
}

View File

@ -123,6 +123,11 @@ public static final class FieldName {
ALL_FIELD_NAMES = Collections.unmodifiableSet(fields);
}
/**
* A suffix appended to each schema name representing Envelope
*/
public static String SCHEMA_NAME_SUFFIX = ".Envelope";
/**
* A builder of an envelope schema.
*/
@ -349,4 +354,30 @@ public static Operation operationFor(SourceRecord record) {
}
return null;
}
/**
* Converts an event type name into envelope schema name
*
* @param type
* @return Envelope schema name
*/
public static String schemaName(String type) {
return type + SCHEMA_NAME_SUFFIX;
}
/**
* @param schemaName
* @return true if schema name conforms to Envelope naming
*/
public static boolean isEnvelopeSchema(String schemaName) {
return schemaName.endsWith(SCHEMA_NAME_SUFFIX);
}
/**
* @param schema
* @return true if schema name conforms to Envelope naming
*/
public static boolean isEnvelopeSchema(Schema schema) {
return isEnvelopeSchema(schema.name());
}
}

View File

@ -14,6 +14,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.relational.Key.KeyMapper;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
@ -136,7 +137,7 @@ protected void removeSchema(TableId id) {
}
private String getEnvelopeSchemaName(Table table) {
return topicSelector.topicNameFor(table.id()) + ".Envelope";
return Envelope.schemaName(topicSelector.topicNameFor(table.id()));
}
/**

View File

@ -317,7 +317,7 @@ private Schema updateEnvelopeSchema(Schema oldEnvelopeSchema, String newTopicNam
}
envelopeBuilder.field(fieldName, fieldSchema);
}
envelopeBuilder.name(schemaNameAdjuster.adjust(newTopicName + ".Envelope"));
envelopeBuilder.name(schemaNameAdjuster.adjust(Envelope.schemaName(newTopicName)));
newEnvelopeSchema = envelopeBuilder.build();
envelopeSchemaUpdateCache.put(oldEnvelopeSchema, newEnvelopeSchema);

View File

@ -10,6 +10,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
/**
* A class used by all Debezium supplied SMTs to centralize common logic.
@ -19,7 +20,6 @@
*/
public class SmtManager<R extends ConnectRecord<R>> {
private static final String ENVELOPE_SCHEMA_NAME_SUFFIX = ".Envelope";
private static final String RECORD_ENVELOPE_KEY_SCHEMA_NAME_SUFFIX = ".Key";
private static final Logger LOGGER = LoggerFactory.getLogger(SmtManager.class);
@ -30,7 +30,7 @@ public SmtManager(Configuration config) {
public boolean isValidEnvelope(final R record) {
if (record.valueSchema() == null ||
record.valueSchema().name() == null ||
!record.valueSchema().name().endsWith(ENVELOPE_SCHEMA_NAME_SUFFIX)) {
!Envelope.isEnvelopeSchema(record.valueSchema())) {
LOGGER.warn("Expected Envelope for transformation, passing it unchanged");
return false;
}