diff --git a/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java b/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java index f0f48135c..63c50ab2d 100644 --- a/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java +++ b/debezium-api/src/main/java/io/debezium/spi/topic/TopicNamingStrategy.java @@ -31,6 +31,10 @@ public interface TopicNamingStrategy { String transactionTopic(); + default String recordSchemaPrefix(I id) { + return dataChangeTopic(id); + } + /** * Sanitize the given character whether is a legal character of a Kafka topic name. * Legal characters are {@code [a-zA-Z0-9._-]}. diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index a54b954d9..6c6192955 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -124,8 +124,9 @@ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) { // Build the schemas ... final TableId tableId = table.id(); - final String schemaNamePrefix = topicNamingStrategy.dataChangeTopic(tableId); - final String envelopSchemaName = Envelope.schemaName(schemaNamePrefix); + final String schemaNamePrefix = topicNamingStrategy.recordSchemaPrefix(tableId); + final String envelopeSchemaPrefix = topicNamingStrategy.dataChangeTopic(tableId); + final String envelopSchemaName = Envelope.schemaName(envelopeSchemaPrefix); LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix); SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value")); SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key"));