Removing extra initialisation of "config" variable in PG config
This commit is contained in:
parent
9fe033ce24
commit
72b06366fe
@ -327,8 +327,8 @@ public static SnapshotMode parse(String value, String defaultValue) {
|
||||
public MongoDbConnectorConfig(Configuration config) {
|
||||
super(config, config.getString(LOGICAL_NAME));
|
||||
|
||||
String snapshotModeValue = config.getString(SNAPSHOT_MODE);
|
||||
this.snapshotMode = SnapshotMode.parse(snapshotModeValue, SNAPSHOT_MODE.defaultValueAsString());
|
||||
String snapshotModeValue = config.getString(MongoDbConnectorConfig.SNAPSHOT_MODE);
|
||||
this.snapshotMode = SnapshotMode.parse(snapshotModeValue, MongoDbConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());
|
||||
}
|
||||
|
||||
protected static ConfigDef configDef() {
|
||||
@ -379,10 +379,11 @@ private static int validateBlacklistField(Configuration config, ValidationOutput
|
||||
* Returns the number of documents to return per fetch by default. Default to {@code 0}, which indicates
|
||||
* that the server chooses an appropriate fetch size.
|
||||
*
|
||||
* @param config configuration
|
||||
* @return the default fetch size
|
||||
*/
|
||||
@Override
|
||||
protected int defaultSnapshotFetchSize() {
|
||||
protected int defaultSnapshotFetchSize(Configuration config) {
|
||||
return DEFAULT_SNAPSHOT_FETCH_SIZE;
|
||||
}
|
||||
|
||||
|
@ -1123,6 +1123,7 @@ public static final Field MASK_COLUMN(int length) {
|
||||
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
|
||||
INCONSISTENT_SCHEMA_HANDLING_MODE,
|
||||
CommonConnectorConfig.SNAPSHOT_DELAY_MS,
|
||||
CommonConnectorConfig.SNAPSHOT_FETCH_SIZE,
|
||||
DDL_PARSER_MODE,
|
||||
CommonConnectorConfig.TOMBSTONES_ON_DELETE, ENABLE_TIME_ADJUSTER);
|
||||
|
||||
@ -1193,7 +1194,7 @@ public SnapshotNewTables getSnapshotNewTables() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int defaultSnapshotFetchSize() {
|
||||
protected int defaultSnapshotFetchSize(Configuration config) {
|
||||
return DEFAULT_SNAPSHOT_FETCH_SIZE;
|
||||
}
|
||||
|
||||
@ -1214,7 +1215,7 @@ protected static ConfigDef configDef() {
|
||||
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE,
|
||||
CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
|
||||
SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_NEW_TABLES, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
|
||||
BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, DDL_PARSER_MODE, ENABLE_TIME_ADJUSTER);
|
||||
BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, SNAPSHOT_FETCH_SIZE, DDL_PARSER_MODE, ENABLE_TIME_ADJUSTER);
|
||||
return config;
|
||||
}
|
||||
|
||||
|
@ -845,7 +845,6 @@ public static SchemaRefreshMode parse(String value) {
|
||||
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, SCHEMA_REFRESH_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE,
|
||||
XMIN_FETCH_INTERVAL, SNAPSHOT_MODE_CLASS);
|
||||
|
||||
private final Configuration config;
|
||||
private final TemporalPrecisionMode temporalPrecisionMode;
|
||||
private final HStoreHandlingMode hStoreHandlingMode;
|
||||
private final SnapshotMode snapshotMode;
|
||||
@ -860,7 +859,6 @@ protected PostgresConnectorConfig(Configuration config) {
|
||||
null
|
||||
);
|
||||
|
||||
this.config = config;
|
||||
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
|
||||
String hstoreHandlingModeStr = config.getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE);
|
||||
HStoreHandlingMode hStoreHandlingMode = HStoreHandlingMode.parse(hstoreHandlingModeStr);
|
||||
@ -870,35 +868,35 @@ protected PostgresConnectorConfig(Configuration config) {
|
||||
}
|
||||
|
||||
protected String hostname() {
|
||||
return config.getString(HOSTNAME);
|
||||
return getConfig().getString(HOSTNAME);
|
||||
}
|
||||
|
||||
protected int port() {
|
||||
return config.getInteger(PORT);
|
||||
return getConfig().getInteger(PORT);
|
||||
}
|
||||
|
||||
protected String databaseName() {
|
||||
return config.getString(DATABASE_NAME);
|
||||
return getConfig().getString(DATABASE_NAME);
|
||||
}
|
||||
|
||||
protected LogicalDecoder plugin() {
|
||||
return LogicalDecoder.parse(config.getString(PLUGIN_NAME));
|
||||
return LogicalDecoder.parse(getConfig().getString(PLUGIN_NAME));
|
||||
}
|
||||
|
||||
protected String slotName() {
|
||||
return config.getString(SLOT_NAME);
|
||||
return getConfig().getString(SLOT_NAME);
|
||||
}
|
||||
|
||||
protected boolean dropSlotOnStop() {
|
||||
return config.getBoolean(DROP_SLOT_ON_STOP);
|
||||
return getConfig().getBoolean(DROP_SLOT_ON_STOP);
|
||||
}
|
||||
|
||||
protected String streamParams() {
|
||||
return config.getString(STREAM_PARAMS);
|
||||
return getConfig().getString(STREAM_PARAMS);
|
||||
}
|
||||
|
||||
protected Integer statusUpdateIntervalMillis() {
|
||||
return config.getInteger(STATUS_UPDATE_INTERVAL_MS, null);
|
||||
return getConfig().getInteger(STATUS_UPDATE_INTERVAL_MS, null);
|
||||
}
|
||||
|
||||
protected TemporalPrecisionMode temporalPrecisionMode() {
|
||||
@ -910,11 +908,11 @@ protected HStoreHandlingMode hStoreHandlingMode() {
|
||||
}
|
||||
|
||||
protected boolean includeUnknownDatatypes() {
|
||||
return config.getBoolean(INCLUDE_UNKNOWN_DATATYPES);
|
||||
return getConfig().getBoolean(INCLUDE_UNKNOWN_DATATYPES);
|
||||
}
|
||||
|
||||
public Configuration jdbcConfig() {
|
||||
return config.subset(DATABASE_CONFIG_PREFIX, true);
|
||||
return getConfig().subset(DATABASE_CONFIG_PREFIX, true);
|
||||
}
|
||||
|
||||
protected TopicSelectionStrategy topicSelectionStrategy() {
|
||||
@ -925,47 +923,47 @@ protected TopicSelectionStrategy topicSelectionStrategy() {
|
||||
}
|
||||
|
||||
protected Map<String, ConfigValue> validate() {
|
||||
return config.validate(ALL_FIELDS);
|
||||
return getConfig().validate(ALL_FIELDS);
|
||||
}
|
||||
|
||||
protected String schemaBlacklist() {
|
||||
return config.getString(SCHEMA_BLACKLIST);
|
||||
return getConfig().getString(SCHEMA_BLACKLIST);
|
||||
}
|
||||
|
||||
protected String schemaWhitelist() {
|
||||
return config.getString(SCHEMA_WHITELIST);
|
||||
return getConfig().getString(SCHEMA_WHITELIST);
|
||||
}
|
||||
|
||||
protected String tableBlacklist() {
|
||||
return config.getString(TABLE_BLACKLIST);
|
||||
return getConfig().getString(TABLE_BLACKLIST);
|
||||
}
|
||||
|
||||
protected String tableWhitelist() {
|
||||
return config.getString(TABLE_WHITELIST);
|
||||
return getConfig().getString(TABLE_WHITELIST);
|
||||
}
|
||||
|
||||
protected String columnBlacklist() {
|
||||
return config.getString(COLUMN_BLACKLIST);
|
||||
return getConfig().getString(COLUMN_BLACKLIST);
|
||||
}
|
||||
|
||||
protected long snapshotLockTimeoutMillis() {
|
||||
return config.getLong(PostgresConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS);
|
||||
return getConfig().getLong(PostgresConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
protected Snapshotter getSnapshotter() {
|
||||
return this.snapshotMode.getSnapshotter(config);
|
||||
return this.snapshotMode.getSnapshotter(getConfig());
|
||||
}
|
||||
|
||||
public String snapshotSelectOverrides() {
|
||||
return config.getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
|
||||
return getConfig().getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
|
||||
}
|
||||
|
||||
public String snapshotSelectOverrideForTable(String table) {
|
||||
return config.getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table);
|
||||
return getConfig().getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int defaultSnapshotFetchSize() {
|
||||
protected int defaultSnapshotFetchSize(Configuration config) {
|
||||
// we use getString() method because it supports null as the return value
|
||||
String rowsFetchSize = config.getString(ROWS_FETCH_SIZE);
|
||||
if (rowsFetchSize != null) {
|
||||
@ -984,7 +982,7 @@ protected boolean skipRefreshSchemaOnMissingToastableData() {
|
||||
}
|
||||
|
||||
protected Duration xminFetchInterval() {
|
||||
return Duration.ofMillis(config.getLong(PostgresConnectorConfig.XMIN_FETCH_INTERVAL));
|
||||
return Duration.ofMillis(getConfig().getLong(PostgresConnectorConfig.XMIN_FETCH_INTERVAL));
|
||||
}
|
||||
|
||||
protected static ConfigDef configDef() {
|
||||
|
@ -69,22 +69,21 @@ public class CommonConnectorConfig {
|
||||
.withValidation(Field::isPositiveInteger);
|
||||
|
||||
public static final Field SNAPSHOT_DELAY_MS = Field.create("snapshot.delay.ms")
|
||||
.withDisplayName("Snapshot Delay (milliseconds)")
|
||||
.withType(Type.LONG)
|
||||
.withWidth(Width.MEDIUM)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription("The number of milliseconds to delay before a snapshot will begin.")
|
||||
.withDefault(0L)
|
||||
.withValidation(Field::isNonNegativeLong);
|
||||
.withDisplayName("Snapshot Delay (milliseconds)")
|
||||
.withType(Type.LONG)
|
||||
.withWidth(Width.MEDIUM)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription("The number of milliseconds to delay before a snapshot will begin.")
|
||||
.withDefault(0L)
|
||||
.withValidation(Field::isNonNegativeLong);
|
||||
|
||||
public static final Field SNAPSHOT_FETCH_SIZE = Field.create("snapshot.fetch.size")
|
||||
.withDisplayName("Snapshot fetch size")
|
||||
.withType(Type.INT)
|
||||
.withWidth(Width.MEDIUM)
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withDescription("The maximum number of records that should be loaded into memory while performing a snapshot")
|
||||
.withValidation(Field::isNonNegativeInteger);
|
||||
|
||||
.withDisplayName("Snapshot fetch size")
|
||||
.withType(Type.INT)
|
||||
.withWidth(Width.MEDIUM)
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withDescription("The maximum number of records that should be loaded into memory while performing a snapshot")
|
||||
.withValidation(Field::isNonNegativeInteger);
|
||||
|
||||
private final Configuration config;
|
||||
private final boolean emitTombstoneOnDelete;
|
||||
@ -105,16 +104,17 @@ protected CommonConnectorConfig(Configuration config, String logicalName) {
|
||||
this.logicalName = logicalName;
|
||||
this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
|
||||
this.snapshotDelayMs = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS));
|
||||
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, this::defaultSnapshotFetchSize);
|
||||
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, () -> defaultSnapshotFetchSize(config));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of records to return per fetch by default.
|
||||
* <p><b>Important:</b> Each connector config must override this method to specify its default value.</p>
|
||||
*
|
||||
* @param config configuration
|
||||
* @return the default fetch size
|
||||
*/
|
||||
protected int defaultSnapshotFetchSize() {
|
||||
protected int defaultSnapshotFetchSize(Configuration config) {
|
||||
throw new UnsupportedOperationException("not implemented");
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,7 @@ public DatabaseHistory getDatabaseHistory() {
|
||||
protected abstract HistoryRecordComparator getHistoryRecordComparator();
|
||||
|
||||
@Override
|
||||
protected int defaultSnapshotFetchSize() {
|
||||
protected int defaultSnapshotFetchSize(Configuration config) {
|
||||
return DEFAULT_SNAPSHOT_FETCH_SIZE;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user