DBZ-5671 Remove logic name parameter from sub connector config

This commit is contained in:
harveyyue 2022-10-02 12:30:22 +08:00 committed by Chris Cranford
parent 7d2cf85279
commit f6b72f060c
14 changed files with 39 additions and 35 deletions

View File

@ -539,7 +539,7 @@ public static ConfigDef configDef() {
private final int cursorMaxAwaitTimeMs; private final int cursorMaxAwaitTimeMs;
public MongoDbConnectorConfig(Configuration config) { public MongoDbConnectorConfig(Configuration config) {
super(config, config.getString(CommonConnectorConfig.TOPIC_PREFIX), DEFAULT_SNAPSHOT_FETCH_SIZE); super(config, DEFAULT_SNAPSHOT_FETCH_SIZE);
String snapshotModeValue = config.getString(MongoDbConnectorConfig.SNAPSHOT_MODE); String snapshotModeValue = config.getString(MongoDbConnectorConfig.SNAPSHOT_MODE);
this.snapshotMode = SnapshotMode.parse(snapshotModeValue, MongoDbConnectorConfig.SNAPSHOT_MODE.defaultValueAsString()); this.snapshotMode = SnapshotMode.parse(snapshotModeValue, MongoDbConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());

View File

@ -17,7 +17,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition; import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
@ -963,7 +962,6 @@ public MySqlConnectorConfig(Configuration config) {
super( super(
MySqlConnector.class, MySqlConnector.class,
config, config,
config.getString(CommonConnectorConfig.TOPIC_PREFIX),
TableFilter.fromPredicate(MySqlConnectorConfig::isNotBuiltInTable), TableFilter.fromPredicate(MySqlConnectorConfig::isNotBuiltInTable),
true, true,
DEFAULT_SNAPSHOT_FETCH_SIZE, DEFAULT_SNAPSHOT_FETCH_SIZE,

View File

@ -22,7 +22,6 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition; import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
@ -573,8 +572,13 @@ public static ConfigDef configDef() {
private final TransactionSnapshotBoundaryMode logMiningTransactionSnapshotBoundaryMode; private final TransactionSnapshotBoundaryMode logMiningTransactionSnapshotBoundaryMode;
public OracleConnectorConfig(Configuration config) { public OracleConnectorConfig(Configuration config) {
super(OracleConnector.class, config, config.getString(CommonConnectorConfig.TOPIC_PREFIX), new SystemTablesPredicate(config), super(
x -> x.schema() + "." + x.table(), true, ColumnFilterMode.SCHEMA, false); OracleConnector.class, config,
new SystemTablesPredicate(config),
x -> x.schema() + "." + x.table(),
true,
ColumnFilterMode.SCHEMA,
false);
this.databaseName = toUpperCase(config.getString(DATABASE_NAME)); this.databaseName = toUpperCase(config.getString(DATABASE_NAME));
this.pdbName = toUpperCase(config.getString(PDB_NAME)); this.pdbName = toUpperCase(config.getString(PDB_NAME));

View File

@ -851,7 +851,6 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public PostgresConnectorConfig(Configuration config) { public PostgresConnectorConfig(Configuration config) {
super( super(
config, config,
config.getString(CommonConnectorConfig.TOPIC_PREFIX),
new SystemTablesPredicate(), new SystemTablesPredicate(),
x -> x.schema() + "." + x.table(), x -> x.schema() + "." + x.table(),
DEFAULT_SNAPSHOT_FETCH_SIZE, DEFAULT_SNAPSHOT_FETCH_SIZE,

View File

@ -18,7 +18,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition; import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
@ -335,9 +334,14 @@ public static ConfigDef configDef() {
private final boolean optionRecompile; private final boolean optionRecompile;
public SqlServerConnectorConfig(Configuration config) { public SqlServerConnectorConfig(Configuration config) {
super(SqlServerConnector.class, config, config.getString(CommonConnectorConfig.TOPIC_PREFIX), new SystemTablesPredicate(), super(
x -> x.schema() + "." + x.table(), true, SqlServerConnector.class,
ColumnFilterMode.SCHEMA, true); config,
new SystemTablesPredicate(),
x -> x.schema() + "." + x.table(),
true,
ColumnFilterMode.SCHEMA,
true);
final String databaseNames = config.getString(DATABASE_NAMES.name()); final String databaseNames = config.getString(DATABASE_NAMES.name());

View File

@ -595,7 +595,7 @@ public static SchemaNameAdjustmentMode parse(String value) {
private final int maxBatchSize; private final int maxBatchSize;
private final long maxQueueSizeInBytes; private final long maxQueueSizeInBytes;
private final Duration pollInterval; private final Duration pollInterval;
private final String logicalName; protected final String logicalName;
private final String heartbeatTopicsPrefix; private final String heartbeatTopicsPrefix;
private final Duration heartbeatInterval; private final Duration heartbeatInterval;
private final Duration snapshotDelay; private final Duration snapshotDelay;
@ -616,14 +616,14 @@ public static SchemaNameAdjustmentMode parse(String value) {
private final EnumSet<Operation> skippedOperations; private final EnumSet<Operation> skippedOperations;
private final String taskId; private final String taskId;
protected CommonConnectorConfig(Configuration config, String logicalName, int defaultSnapshotFetchSize) { protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSize) {
this.config = config; this.config = config;
this.emitTombstoneOnDelete = config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE); this.emitTombstoneOnDelete = config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE);
this.maxQueueSize = config.getInteger(MAX_QUEUE_SIZE); this.maxQueueSize = config.getInteger(MAX_QUEUE_SIZE);
this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE); this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
this.pollInterval = config.getDuration(POLL_INTERVAL_MS, ChronoUnit.MILLIS); this.pollInterval = config.getDuration(POLL_INTERVAL_MS, ChronoUnit.MILLIS);
this.maxQueueSizeInBytes = config.getLong(MAX_QUEUE_SIZE_IN_BYTES); this.maxQueueSizeInBytes = config.getLong(MAX_QUEUE_SIZE_IN_BYTES);
this.logicalName = logicalName; this.logicalName = config.getString(CommonConnectorConfig.TOPIC_PREFIX);
this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX); this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
this.heartbeatInterval = config.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS); this.heartbeatInterval = config.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
this.snapshotDelay = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS)); this.snapshotDelay = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS));

View File

@ -33,7 +33,6 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory"; private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory";
private boolean useCatalogBeforeSchema; private boolean useCatalogBeforeSchema;
private final String logicalName;
private final Class<? extends SourceConnector> connectorClass; private final Class<? extends SourceConnector> connectorClass;
private final boolean multiPartitionMode; private final boolean multiPartitionMode;
@ -60,26 +59,24 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
.create(); .create();
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
Configuration config, String logicalName, Configuration config,
TableFilter systemTablesFilter, TableFilter systemTablesFilter,
boolean useCatalogBeforeSchema, boolean useCatalogBeforeSchema,
int defaultSnapshotFetchSize, int defaultSnapshotFetchSize,
ColumnFilterMode columnFilterMode, ColumnFilterMode columnFilterMode,
boolean multiPartitionMode) { boolean multiPartitionMode) {
super(config, logicalName, systemTablesFilter, TableId::toString, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema); super(config, systemTablesFilter, TableId::toString, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema);
this.useCatalogBeforeSchema = useCatalogBeforeSchema; this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.logicalName = logicalName;
this.connectorClass = connectorClass; this.connectorClass = connectorClass;
this.multiPartitionMode = multiPartitionMode; this.multiPartitionMode = multiPartitionMode;
} }
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, Configuration config, String logicalName, protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, Configuration config,
TableFilter systemTablesFilter, TableIdToStringMapper tableIdMapper, TableFilter systemTablesFilter, TableIdToStringMapper tableIdMapper,
boolean useCatalogBeforeSchema, ColumnFilterMode columnFilterMode, boolean useCatalogBeforeSchema, ColumnFilterMode columnFilterMode,
boolean multiPartitionMode) { boolean multiPartitionMode) {
super(config, logicalName, systemTablesFilter, tableIdMapper, DEFAULT_SNAPSHOT_FETCH_SIZE, columnFilterMode, useCatalogBeforeSchema); super(config, systemTablesFilter, tableIdMapper, DEFAULT_SNAPSHOT_FETCH_SIZE, columnFilterMode, useCatalogBeforeSchema);
this.useCatalogBeforeSchema = useCatalogBeforeSchema; this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.logicalName = logicalName;
this.connectorClass = connectorClass; this.connectorClass = connectorClass;
this.multiPartitionMode = multiPartitionMode; this.multiPartitionMode = multiPartitionMode;
} }

View File

@ -500,10 +500,10 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
private final JdbcConfiguration jdbcConfig; private final JdbcConfiguration jdbcConfig;
private final String heartbeatActionQuery; private final String heartbeatActionQuery;
protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, TableFilter systemTablesFilter, protected RelationalDatabaseConnectorConfig(Configuration config, TableFilter systemTablesFilter,
TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize, TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize,
ColumnFilterMode columnFilterMode, boolean useCatalogBeforeSchema) { ColumnFilterMode columnFilterMode, boolean useCatalogBeforeSchema) {
super(config, logicalName, defaultSnapshotFetchSize); super(config, defaultSnapshotFetchSize);
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE)); this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS), tableIdMapper); this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS), tableIdMapper);

View File

@ -14,9 +14,9 @@
public class TestRelationalDatabaseConfig extends RelationalDatabaseConnectorConfig { public class TestRelationalDatabaseConfig extends RelationalDatabaseConnectorConfig {
public TestRelationalDatabaseConfig(Configuration config, String logicalName, Tables.TableFilter systemTablesFilter, public TestRelationalDatabaseConfig(Configuration config, Tables.TableFilter systemTablesFilter,
Selectors.TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize) { Selectors.TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize) {
super(config, logicalName, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, ColumnFilterMode.SCHEMA, false); super(config, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, ColumnFilterMode.SCHEMA, false);
} }
@Override @Override

View File

@ -25,7 +25,7 @@ public class ErrorHandlerTest {
private static final class TestConnectorConfig extends CommonConnectorConfig { private static final class TestConnectorConfig extends CommonConnectorConfig {
protected TestConnectorConfig(Configuration config) { protected TestConnectorConfig(Configuration config) {
super(config, "test", 0); super(config, 0);
} }
@Override @Override

View File

@ -127,7 +127,8 @@ public void shouldIgnoreInvalidEnvelope() throws Exception {
} }
protected CommonConnectorConfig config() { protected CommonConnectorConfig config() {
return new CommonConnectorConfig(Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").build(), "core", 0) { return new CommonConnectorConfig(Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal")
.with(CommonConnectorConfig.TOPIC_PREFIX, "core").build(), 0) {
@Override @Override
protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) { protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
return null; return null;

View File

@ -26,8 +26,9 @@ public class SignalBasedSnapshotChangeEventSourceTest {
protected RelationalDatabaseConnectorConfig config() { protected RelationalDatabaseConnectorConfig config() {
return new RelationalDatabaseConnectorConfig( return new RelationalDatabaseConnectorConfig(
Configuration.create().with(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").build(), Configuration.create().with(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal")
"core", null, null, 0, ColumnFilterMode.CATALOG, true) { .with(RelationalDatabaseConnectorConfig.TOPIC_PREFIX, "core").build(),
null, null, 0, ColumnFilterMode.CATALOG, true) {
@Override @Override
protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) { protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
return null; return null;

View File

@ -506,7 +506,7 @@ public void mapperConvertersShouldLeaveEmptyDatesAsZero() {
Object[] data = new Object[]{ null }; Object[] data = new Object[]{ null };
ColumnMappers mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); ColumnMappers mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, null, null, 0));
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry, schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false) SchemaBuilder.struct().build(), false, false)

View File

@ -48,7 +48,7 @@ public void shouldNotFindMapperForUnmatchedColumn() {
.with("column.truncate.to.10.chars", fullyQualifiedNames) .with("column.truncate.to.10.chars", fullyQualifiedNames)
.build(); .build();
mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, null, null, 0));
converter = mappers.mappingConverterFor(tableId, column2); converter = mappers.mappingConverterFor(tableId, column2);
assertThat(converter).isNull(); assertThat(converter).isNull();
} }
@ -59,7 +59,7 @@ public void shouldTruncateStrings() {
.with("column.truncate.to.10.chars", fullyQualifiedNames.toUpperCase()) .with("column.truncate.to.10.chars", fullyQualifiedNames.toUpperCase())
.build(); .build();
mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, null, null, 0));
converter = mappers.mappingConverterFor(tableId, column); converter = mappers.mappingConverterFor(tableId, column);
assertThat(converter).isNotNull(); assertThat(converter).isNotNull();
@ -82,7 +82,7 @@ public void shouldMaskStringsToFixedLength() {
.with("column.mask.with.10.chars", fullyQualifiedNames) .with("column.mask.with.10.chars", fullyQualifiedNames)
.build(); .build();
mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); // exact case mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, null, null, 0)); // exact case
converter = mappers.mappingConverterFor(tableId, column); converter = mappers.mappingConverterFor(tableId, column);
assertThat(converter).isNotNull(); assertThat(converter).isNotNull();
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue); assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
@ -149,7 +149,7 @@ public void shouldMaskStringsToMaskHashV1Column() {
.with("column.mask.hash.SHA-256.with.salt.salt123", fullyQualifiedNames) .with("column.mask.hash.SHA-256.with.salt.salt123", fullyQualifiedNames)
.build(); .build();
mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); // exact case mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, null, null, 0)); // exact case
converter = mappers.mappingConverterFor(tableId, column); converter = mappers.mappingConverterFor(tableId, column);
assertThat(converter).isNotNull(); assertThat(converter).isNotNull();
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue); assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
@ -165,7 +165,7 @@ public void shouldMaskStringsToMaskHashV2Column() {
.with("column.mask.hash.v2.SHA-256.with.salt.salt123", fullyQualifiedNames) .with("column.mask.hash.v2.SHA-256.with.salt.salt123", fullyQualifiedNames)
.build(); .build();
mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); // exact case mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, null, null, 0)); // exact case
converter = mappers.mappingConverterFor(tableId, column); converter = mappers.mappingConverterFor(tableId, column);
assertThat(converter).isNotNull(); assertThat(converter).isNotNull();
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue); assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);