DBZ-4783: Refactoring and cleanup

This commit is contained in:
Sergei Morozov 2022-02-25 19:10:34 -08:00 committed by Gunnar Morling
parent cc11ac193a
commit d6e96a5108
2 changed files with 21 additions and 28 deletions

View File

@ -11,7 +11,6 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -26,7 +25,6 @@
import io.debezium.config.Configuration;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Strings;
/**
* The main connector class used to instantiate configuration and execution classes
@ -61,28 +59,21 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
throw new IllegalArgumentException("Only a single connector task may be started in single-partition mode");
}
final Configuration config = Configuration.from(properties);
final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(config);
final boolean multiPartitionMode = sqlServerConfig.isMultiPartitionModeEnabled();
final SqlServerConnectorConfig config = new SqlServerConnectorConfig(Configuration.from(properties));
List<String> databaseNames;
try (SqlServerConnection connection = connect(sqlServerConfig)) {
if (multiPartitionMode) {
databaseNames = Arrays.asList(properties.get(DATABASE_NAMES.name()).split(","));
}
else {
databaseNames = Collections.singletonList(properties.get(DATABASE_NAME.name()));
}
return buildTaskConfigs(connection, databaseNames, multiPartitionMode, maxTasks);
try (SqlServerConnection connection = connect(config)) {
return buildTaskConfigs(connection, config, maxTasks);
}
catch (SQLException e) {
throw new IllegalArgumentException("Could not build task configs", e);
}
}
private List<Map<String, String>> buildTaskConfigs(SqlServerConnection connection, List<String> databaseNames,
boolean multiPartitionMode, int maxTasks) {
private List<Map<String, String>> buildTaskConfigs(SqlServerConnection connection, SqlServerConnectorConfig config,
int maxTasks) {
final boolean multiPartitionMode = config.isMultiPartitionModeEnabled();
List<String> databaseNames = config.getDatabaseNames();
// Initialize the database list for each task
List<List<String>> databasesByTask = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
@ -125,13 +116,12 @@ public ConfigDef config() {
@Override
protected void validateConnection(Map<String, ConfigValue> configValues, Configuration config) {
final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(config);
if (Strings.isNullOrEmpty(sqlServerConfig.getDatabaseName())) {
throw new IllegalArgumentException("Either '" + DATABASE_NAME + "' or '" + DATABASE_NAMES
+ "' option must be specified");
if (!configValues.get(DATABASE_NAME.name()).errorMessages().isEmpty()
|| !configValues.get(DATABASE_NAMES.name()).errorMessages().isEmpty()) {
return;
}
final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(config);
final ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
final ConfigValue userValue = configValues.get(RelationalDatabaseConnectorConfig.USER.name());
// Try to connect to the database ...

View File

@ -7,6 +7,9 @@
import java.time.DateTimeException;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -372,7 +375,7 @@ public static ConfigDef configDef() {
return CONFIG_DEFINITION.configDef();
}
private final String databaseName;
private final List<String> databaseNames;
private final String instanceName;
private final SnapshotMode snapshotMode;
private final SnapshotIsolationMode snapshotIsolationMode;
@ -391,16 +394,16 @@ public SqlServerConnectorConfig(Configuration config) {
if (databaseName != null) {
multiPartitionMode = false;
this.databaseName = databaseName;
this.databaseNames = Collections.singletonList(databaseName);
}
else if (databaseNames != null) {
multiPartitionMode = true;
this.databaseName = databaseNames;
this.databaseNames = Arrays.asList(databaseNames.split(","));
LOGGER.info("Multi-partition mode is enabled");
}
else {
multiPartitionMode = false;
this.databaseName = null;
this.databaseNames = Collections.emptyList();
}
this.instanceName = config.getString(INSTANCE);
@ -429,8 +432,8 @@ public Configuration jdbcConfig() {
return getConfig().subset(DATABASE_CONFIG_PREFIX, true);
}
public String getDatabaseName() {
return databaseName;
public List<String> getDatabaseNames() {
return databaseNames;
}
public String getInstanceName() {