DBZ-6641: Schema name changed with Custom topic naming strategy

This commit is contained in:
Anil Dasari 2023-07-09 23:20:57 -07:00 committed by Jiri Pechanec
parent 55650fac7d
commit a4c0bf5614
2 changed files with 7 additions and 2 deletions

View File

@ -31,6 +31,10 @@ public interface TopicNamingStrategy<I extends DataCollectionId> {
String transactionTopic();
default String recordSchemaPrefix(I id) {
return dataChangeTopic(id);
}
/**
* Sanitize the given character whether is a legal character of a Kafka topic name.
* Legal characters are {@code [a-zA-Z0-9._-]}.

View File

@ -124,8 +124,9 @@ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider,
public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) {
// Build the schemas ...
final TableId tableId = table.id();
final String schemaNamePrefix = topicNamingStrategy.dataChangeTopic(tableId);
final String envelopSchemaName = Envelope.schemaName(schemaNamePrefix);
final String schemaNamePrefix = topicNamingStrategy.recordSchemaPrefix(tableId);
final String envelopeSchemaPrefix = topicNamingStrategy.dataChangeTopic(tableId);
final String envelopSchemaName = Envelope.schemaName(envelopeSchemaPrefix);
LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix);
SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value"));
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key"));