DBZ-5594 Clean up "logical name" config

This commit is contained in:
harveyyue 2022-09-08 15:43:43 +08:00 committed by Vojtěch Juránek
parent 1a4cb1458a
commit d1153e8ad4
12 changed files with 52 additions and 63 deletions

View File

@ -94,7 +94,7 @@ public void shouldFailToValidateInvalidConfiguration() {
Config result = connector.validate(config.asMap());
assertConfigurationErrors(result, MongoDbConnectorConfig.HOSTS, 1);
assertNoConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX);
assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.USER);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.PASSWORD);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS);

View File

@ -67,6 +67,7 @@ public class MySqlAntlrDdlParserTest {
private SimpleDdlParserListener listener;
private MySqlValueConverters converters;
private TableSchemaBuilder tableSchemaBuilder;
private Properties properties;
@Before
public void beforeEach() {
@ -82,6 +83,8 @@ public void beforeEach() {
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.NO_OP, new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
properties = new Properties();
properties.put("topic.prefix", "test");
}
@Test
@ -3427,7 +3430,7 @@ private MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean
}
private Schema getColumnSchema(Table table, String column) {
TableSchema schema = tableSchemaBuilder.create(new DefaultTopicNamingStrategy(new Properties(), "test-1"), table, null, null, null);
TableSchema schema = tableSchemaBuilder.create(new DefaultTopicNamingStrategy(properties), table, null, null, null);
return schema.getEnvelopeSchema().schema().field("after").schema().field(column).schema();
}
}

View File

@ -130,7 +130,7 @@ public void shouldFailToValidateInvalidConfiguration() {
assertConfigurationErrors(result, MySqlConnectorConfig.HOSTNAME, 1);
assertNoConfigurationErrors(result, MySqlConnectorConfig.PORT);
assertConfigurationErrors(result, MySqlConnectorConfig.USER, 1);
assertNoConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX);
assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1);
assertConfigurationErrors(result, MySqlConnectorConfig.SERVER_ID);
assertNoConfigurationErrors(result, MySqlConnectorConfig.TABLES_IGNORE_BUILTIN);
assertNoConfigurationErrors(result, MySqlConnectorConfig.DATABASE_INCLUDE_LIST);

View File

@ -605,7 +605,7 @@ public void parseNumericAndDecimalToIntDefaultValue() {
assertThat(((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size()).isEqualTo(0);
assertThat(tables.size()).isEqualTo(1);
TableSchema schema = tableSchemaBuilder.create(new DefaultTopicNamingStrategy(new Properties(), "test"), table, null, null, null);
TableSchema schema = tableSchemaBuilder.create(defaultTopicNamingStrategy(), table, null, null, null);
assertThat(getColumnSchema(schema, "c0").defaultValue()).isEqualTo((short) 10);
assertThat(getColumnSchema(schema, "c1").defaultValue()).isEqualTo(5);
assertThat(getColumnSchema(schema, "c2").defaultValue()).isEqualTo(0L);
@ -620,11 +620,17 @@ private Schema getColumnSchema(Table table, String column) {
}
private Schema getColumnSchema(Table table, String column, TableSchemaBuilder tableSchemaBuilder) {
TableSchema schema = tableSchemaBuilder.create(new DefaultTopicNamingStrategy(new Properties(), "test"), table, null, null, null);
TableSchema schema = tableSchemaBuilder.create(defaultTopicNamingStrategy(), table, null, null, null);
return schema.getEnvelopeSchema().schema().field("after").schema().field(column).schema();
}
private Schema getColumnSchema(TableSchema tableSchema, String column) {
return tableSchema.valueSchema().field(column).schema();
}
private DefaultTopicNamingStrategy defaultTopicNamingStrategy() {
Properties properties = new Properties();
properties.put("topic.prefix", "test");
return new DefaultTopicNamingStrategy(properties);
}
}

View File

@ -16,7 +16,6 @@
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.TableId;
@ -31,7 +30,7 @@ public void testDataChangeTopic() {
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put("topic.delimiter", ".");
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
props.put("topic.prefix", logicalName);
final DefaultTopicNamingStrategy defaultStrategy = new DefaultTopicNamingStrategy(props);
String dataChangeTopic = defaultStrategy.dataChangeTopic(tableId);
assertThat(dataChangeTopic).isEqualTo("mysql-server-1.test_db.dbz_4180");
@ -53,7 +52,7 @@ public void testDataChangeTopic() {
public void testSchemaChangeTopic() {
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
props.put("topic.prefix", logicalName);
final DefaultTopicNamingStrategy defaultStrategy = new DefaultTopicNamingStrategy(props);
String schemaChangeTopic = defaultStrategy.schemaChangeTopic();
assertThat(schemaChangeTopic).isEqualTo("mysql-server-1");
@ -68,7 +67,7 @@ public void testSchemaChangeTopic() {
public void testTransactionTopic() {
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
props.put("topic.prefix", logicalName);
final DefaultTopicNamingStrategy mySqlStrategy = new DefaultTopicNamingStrategy(props);
String transactionTopic = mySqlStrategy.transactionTopic();
String expectedTopic = "mysql-server-1." + DefaultTopicNamingStrategy.DEFAULT_TRANSACTION_TOPIC;
@ -79,7 +78,7 @@ public void testTransactionTopic() {
public void testHeartbeatTopic() {
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
props.put("topic.prefix", logicalName);
final DefaultTopicNamingStrategy mySqlStrategy = new DefaultTopicNamingStrategy(props);
String heartbeatTopic = mySqlStrategy.heartbeatTopic();
String expectedTopic = DefaultTopicNamingStrategy.DEFAULT_HEARTBEAT_TOPIC_PREFIX + ".mysql-server-1";
@ -95,7 +94,7 @@ public void testLogicTableTopic() {
props.put("topic.regex.enable", "true");
props.put("topic.regex", "(.*)(dbz_4180|test)(.*)");
props.put("topic.replacement", "$1$2_all_shards");
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
props.put("topic.prefix", logicalName);
final DefaultRegexTopicNamingStrategy byLogicalStrategy = new DefaultRegexTopicNamingStrategy(props);
String dataChangeTopic = byLogicalStrategy.dataChangeTopic(tableId);

View File

@ -214,7 +214,7 @@ public void shouldValidateConfiguration() throws Exception {
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.HOSTNAME, 1);
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.USER, 1);
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.DATABASE_NAME, 1);
assertNoConfigurationErrors(validatedConfig, CommonConnectorConfig.TOPIC_PREFIX);
assertConfigurationErrors(validatedConfig, CommonConnectorConfig.TOPIC_PREFIX, 1);
// validate the non required fields
validateConfigField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.DECODERBUFS.getValue());

View File

@ -500,8 +500,10 @@ private void assertColumnHasNotDefaultValue(Table table, String columnName) {
}
private void assertColumnHasDefaultValue(Table table, String columnName, Object expectedValue, TableSchemaBuilder tableSchemaBuilder) {
TableSchema schema = tableSchemaBuilder.create(new SchemaTopicNamingStrategy(new Properties(), "test", tableSchemaBuilder.isMultiPartitionMode()), table, null,
null, null);
Properties properties = new Properties();
properties.put("topic.prefix", "test");
TableSchema schema = tableSchemaBuilder.create(new SchemaTopicNamingStrategy(properties, tableSchemaBuilder.isMultiPartitionMode()),
table, null, null, null);
Schema columnSchema = schema.getEnvelopeSchema().schema().field("after").schema().field(columnName).schema();
Column column = table.columnWithName(columnName);

View File

@ -54,8 +54,6 @@
*/
public abstract class CommonConnectorConfig {
public static final String TASK_ID = "task.id";
public static final String LOGICAL_NAME = "logical.name";
public static final String LOGIC_NAME_PLACEHOLDER = "${logical.name}";
public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9_.\\-]+$");
public static final String MULTI_PARTITION_MODE = "multi.partition.mode";
@ -322,11 +320,12 @@ public static SchemaNameAdjustmentMode parse(String value) {
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withDefault(LOGIC_NAME_PLACEHOLDER)
.withValidation(CommonConnectorConfig::validateTopicName)
.required()
.withDescription("The name of the prefix to be used for all topics, the placeholder " + LOGIC_NAME_PLACEHOLDER +
" can be used for referring to the connector's logical name as default value.");
.withDescription("Topic prefix that identifies and provides a namespace for the particular database " +
"server/cluster is capturing changes. The topic prefix should be unique across all other connectors, " +
"since it is used as a prefix for all Kafka topic names that receive events emitted by this connector. " +
"Only alphanumeric characters, hyphens, dots and underscores must be accepted.");
public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms")
.withDisplayName("Retriable restart wait (ms)")
@ -768,7 +767,6 @@ public TopicNamingStrategy getTopicNamingStrategy(Field topicNamingStrategyField
@SuppressWarnings("unchecked")
public TopicNamingStrategy getTopicNamingStrategy(Field topicNamingStrategyField, boolean multiPartitionMode) {
Properties props = config.asProperties();
props.put(LOGICAL_NAME, logicalName);
props.put(MULTI_PARTITION_MODE, multiPartitionMode);
String strategyName = config.getString(topicNamingStrategyField);
TopicNamingStrategy topicNamingStrategy = config.getInstance(topicNamingStrategyField, TopicNamingStrategy.class, props);
@ -974,10 +972,6 @@ public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, Schema
public static int validateTopicName(Configuration config, Field field, Field.ValidationOutput problems) {
String name = config.getString(field);
if (name.equals(LOGIC_NAME_PLACEHOLDER)) {
return 0;
}
if (name != null) {
if (!TOPIC_NAME_PATTERN.asPredicate().test(name)) {
problems.accept(field, name, name + " has invalid format (only the underscore, hyphen, dot and alphanumeric characters are allowed)");

View File

@ -18,7 +18,6 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.BoundedConcurrentHashMap;
@ -60,7 +59,7 @@ public abstract class AbstractTopicNamingStrategy<I extends DataCollectionId> im
.withDefault(DEFAULT_HEARTBEAT_TOPIC_PREFIX)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Specify the heartbeat topic name. Defaults to " +
DEFAULT_HEARTBEAT_TOPIC_PREFIX + "." + CommonConnectorConfig.LOGIC_NAME_PLACEHOLDER);
DEFAULT_HEARTBEAT_TOPIC_PREFIX + ".${topic.prefix}");
public static final Field TOPIC_TRANSACTION = Field.create("topic.transaction")
.withDisplayName("Transaction topic name")
@ -70,25 +69,17 @@ public abstract class AbstractTopicNamingStrategy<I extends DataCollectionId> im
.withDefault(DEFAULT_TRANSACTION_TOPIC)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Specify the transaction topic name. Defaults to " +
CommonConnectorConfig.LOGIC_NAME_PLACEHOLDER + "." + DEFAULT_TRANSACTION_TOPIC);
"${topic.prefix}." + DEFAULT_TRANSACTION_TOPIC);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTopicNamingStrategy.class);
protected BoundedConcurrentHashMap<I, String> topicNames;
protected String logicalName;
protected String delimiter;
protected String prefix;
protected String transaction;
protected String heartbeatPrefix;
public AbstractTopicNamingStrategy(Properties props) {
this.logicalName = props.getProperty(RelationalDatabaseConnectorConfig.LOGICAL_NAME);
assert logicalName != null;
this.configure(props);
}
public AbstractTopicNamingStrategy(Properties props, String logicalName) {
this.logicalName = logicalName;
this.configure(props);
}
@ -111,14 +102,10 @@ public void configure(Properties props) {
10,
BoundedConcurrentHashMap.Eviction.LRU);
delimiter = config.getString(TOPIC_DELIMITER);
if (config.getString(CommonConnectorConfig.TOPIC_PREFIX).equals(CommonConnectorConfig.LOGIC_NAME_PLACEHOLDER)) {
prefix = logicalName;
}
else {
prefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX);
}
heartbeatPrefix = config.getString(TOPIC_HEARTBEAT_PREFIX);
transaction = config.getString(TOPIC_TRANSACTION);
prefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX);
assert prefix != null;
}
@Override

View File

@ -24,12 +24,8 @@ public DefaultTopicNamingStrategy(Properties props) {
super(props);
}
public DefaultTopicNamingStrategy(Properties props, String logicalName) {
super(props, logicalName);
}
public static DefaultTopicNamingStrategy create(CommonConnectorConfig config) {
return new DefaultTopicNamingStrategy(config.getConfig().asProperties(), config.getLogicalName());
return new DefaultTopicNamingStrategy(config.getConfig().asProperties());
}
@Override

View File

@ -28,8 +28,8 @@ public SchemaTopicNamingStrategy(Properties props) {
: Boolean.parseBoolean(props.get(CommonConnectorConfig.MULTI_PARTITION_MODE).toString());
}
public SchemaTopicNamingStrategy(Properties props, String logicalName, boolean multiPartitionMode) {
super(props, logicalName);
public SchemaTopicNamingStrategy(Properties props, boolean multiPartitionMode) {
super(props);
this.multiPartitionMode = multiPartitionMode;
}
@ -38,7 +38,7 @@ public static SchemaTopicNamingStrategy create(CommonConnectorConfig config) {
}
public static SchemaTopicNamingStrategy create(CommonConnectorConfig config, boolean multiPartitionMode) {
return new SchemaTopicNamingStrategy(config.getConfig().asProperties(), config.getLogicalName(), multiPartitionMode);
return new SchemaTopicNamingStrategy(config.getConfig().asProperties(), multiPartitionMode);
}
@Override

View File

@ -42,7 +42,6 @@ public class TableSchemaBuilderTest {
private static final String AVRO_UNSUPPORTED_NAME = "9-`~!@#$%^&*()+=[]{}\\|;:\"'<>,.?/";
private static final String AVRO_UNSUPPORTED_NAME_CONVERTED = "_9_______________________________";
private final String prefix = "";
private final TableId id = new TableId("catalog", "schema", "table");
private final Object[] data = new Object[]{ "c1value", 3.142d, java.sql.Date.valueOf("2001-10-31"), 4, new byte[]{ 71, 117, 110, 110, 97, 114 }, null, "c7value",
"c8value", "c9value", null };
@ -63,13 +62,16 @@ public class TableSchemaBuilderTest {
private TopicNamingStrategy topicNamingStrategy;
private SchemaNameAdjuster adjuster;
private final CustomConverterRegistry customConverterRegistry = new CustomConverterRegistry(null);
private Properties topicProperties;
@Before
public void beforeEach() {
adjuster = SchemaNameAdjuster.create("_", (original, replacement, conflict) -> {
fail("Should not have come across an invalid schema name");
});
topicNamingStrategy = new SchemaTopicNamingStrategy(new Properties(), prefix, false);
topicProperties = new Properties();
topicProperties.put("topic.prefix", "test");
topicNamingStrategy = new SchemaTopicNamingStrategy(topicProperties, false);
schema = null;
table = Table.editor()
.tableId(id)
@ -167,8 +169,8 @@ public void shouldBuildCorrectSchemaNames() {
SchemaBuilder.struct().build(), false, false)
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("schema.table.Value");
assertThat(schema.keySchema().name()).isEqualTo("test.schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("test.schema.table.Value");
// only catalog
table = table.edit()
@ -177,11 +179,11 @@ public void shouldBuildCorrectSchemaNames() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(new DefaultTopicNamingStrategy(new Properties(), prefix), table, null, null, null);
.create(new DefaultTopicNamingStrategy(topicProperties), table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testDb.testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("testDb.testTable.Value");
assertThat(schema.keySchema().name()).isEqualTo("test.testDb.testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("test.testDb.testTable.Value");
// only schema
table = table.edit()
@ -193,8 +195,8 @@ public void shouldBuildCorrectSchemaNames() {
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testSchema.testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("testSchema.testTable.Value");
assertThat(schema.keySchema().name()).isEqualTo("test.testSchema.testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("test.testSchema.testTable.Value");
// neither catalog nor schema
table = table.edit()
@ -206,8 +208,8 @@ public void shouldBuildCorrectSchemaNames() {
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("testTable.Value");
assertThat(schema.keySchema().name()).isEqualTo("test.testTable.Key");
assertThat(schema.valueSchema().name()).isEqualTo("test.testTable.Value");
}
@Test
@ -216,10 +218,10 @@ public void shouldBuildCorrectSchemaNamesInMultiPartitionMode() {
// table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, true)
.create(new SchemaTopicNamingStrategy(new Properties(), prefix, true), table, null, null, null);
.create(new SchemaTopicNamingStrategy(topicProperties, true), table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("catalog.schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("catalog.schema.table.Value");
assertThat(schema.keySchema().name()).isEqualTo("test.catalog.schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("test.catalog.schema.table.Value");
}
@Test