DBZ-2975: Introduce opt-in configuration for multi-partition mode

This commit is contained in:
Sergei Morozov 2021-08-17 17:28:29 -07:00 committed by Gunnar Morling
parent ade15cd8f3
commit b06b5aecbc
6 changed files with 174 additions and 10 deletions

View File

@ -21,6 +21,7 @@
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
/**
* The main connector class used to instantiate configuration and execution classes
@ -58,8 +59,15 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
Map<String, String> taskConfig = new HashMap<>(properties);
Configuration config = Configuration.from(properties);
try (SqlServerConnection connection = connect(config)) {
taskConfig.put(RelationalDatabaseConnectorConfig.DATABASE_NAME.name(), connection.retrieveRealDatabaseName());
final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(config);
try (SqlServerConnection connection = connect(sqlServerConfig)) {
final String realDatabaseName = connection.retrieveRealDatabaseName();
if (!sqlServerConfig.isMultiPartitionModeEnabled()) {
taskConfig.put(SqlServerConnectorConfig.DATABASE_NAME.name(), realDatabaseName);
}
else {
taskConfig.put(SqlServerConnectorConfig.DATABASE_NAMES.name(), realDatabaseName);
}
}
catch (SQLException e) {
throw new RuntimeException("Could not retrieve real database name", e);
@ -79,15 +87,18 @@ public ConfigDef config() {
@Override
protected void validateConnection(Map<String, ConfigValue> configValues, Configuration config) {
final ConfigValue databaseValue = configValues.get(RelationalDatabaseConnectorConfig.DATABASE_NAME.name());
if (!databaseValue.errorMessages().isEmpty()) {
return;
final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(config);
if (Strings.isNullOrEmpty(sqlServerConfig.getDatabaseName())) {
throw new IllegalArgumentException("Either '" + SqlServerConnectorConfig.DATABASE_NAME
+ "' or '" + SqlServerConnectorConfig.DATABASE_NAMES
+ "' option must be specified");
}
final ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
final ConfigValue userValue = configValues.get(RelationalDatabaseConnectorConfig.USER.name());
// Try to connect to the database ...
try (SqlServerConnection connection = connect(config)) {
try (SqlServerConnection connection = connect(sqlServerConfig)) {
connection.execute("SELECT @@VERSION");
LOGGER.debug("Successfully tested connection for {} with user '{}'", connection.connectionString(),
connection.username());
@ -105,8 +116,7 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return config.validate(SqlServerConnectorConfig.ALL_FIELDS);
}
private SqlServerConnection connect(Configuration config) {
SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(config);
private SqlServerConnection connect(SqlServerConnectorConfig sqlServerConfig) {
return new SqlServerConnection(sqlServerConfig.jdbcConfig(), Clock.system(),
sqlServerConfig.getSourceTimestampMode(), null);
}

View File

@ -225,6 +225,18 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
.withValidation(Field::isOptional)
.withDescription("The SQL Server instance name");
public static final Field DATABASE_NAME = RelationalDatabaseConnectorConfig.DATABASE_NAME
.withNoValidation()
.withValidation(SqlServerConnectorConfig::validateDatabaseName);
public static final Field DATABASE_NAMES = Field.create(DATABASE_CONFIG_PREFIX + "names")
.withDisplayName("Databases")
.withType(Type.LIST)
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(SqlServerConnectorConfig::validateDatabaseNames)
.withDescription("The names of the databases from which the connector should capture changes");
/**
* @deprecated The connector will determine the database server timezone offset automatically.
*/
@ -309,6 +321,7 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
.name("SQL Server")
.type(
DATABASE_NAME,
DATABASE_NAMES,
HOSTNAME,
PORT,
USER,
@ -344,12 +357,29 @@ public static ConfigDef configDef() {
private final SourceTimestampMode sourceTimestampMode;
private final boolean readOnlyDatabaseConnection;
private final int maxTransactionsPerIteration;
private final boolean multiPartitionMode;
public SqlServerConnectorConfig(Configuration config) {
super(SqlServerConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(), x -> x.schema() + "." + x.table(), true,
ColumnFilterMode.SCHEMA);
this.databaseName = config.getString(DATABASE_NAME);
final String databaseName = config.getString(DATABASE_NAME.name());
final String databaseNames = config.getString(DATABASE_NAMES.name());
if (databaseName != null) {
multiPartitionMode = false;
this.databaseName = databaseName;
}
else if (databaseNames != null) {
multiPartitionMode = true;
this.databaseName = databaseNames;
LOGGER.info("Multi-partition mode is enabled");
}
else {
multiPartitionMode = false;
this.databaseName = null;
}
this.instanceName = config.getString(INSTANCE);
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
@ -382,6 +412,10 @@ public String getInstanceName() {
return instanceName;
}
public boolean isMultiPartitionModeEnabled() {
return multiPartitionMode;
}
public SnapshotIsolationMode getSnapshotIsolationMode() {
return this.snapshotIsolationMode;
}
@ -447,4 +481,30 @@ public String getContextName() {
public String getConnectorName() {
return Module.name();
}
private static int validateDatabaseName(Configuration config, Field field, Field.ValidationOutput problems) {
if (config.hasKey(field) && config.hasKey(DATABASE_NAMES)) {
problems.accept(field, null, "Cannot be specified alongside " + DATABASE_NAMES);
return 1;
}
return 0;
}
private static int validateDatabaseNames(Configuration config, Field field, Field.ValidationOutput problems) {
String databaseNames = config.getString(field);
int count = 0;
if (databaseNames != null) {
if (config.hasKey(DATABASE_NAME)) {
problems.accept(field, null, "Cannot be specified alongside " + DATABASE_NAME);
++count;
}
if (databaseNames.contains(",")) {
problems.accept(field, databaseNames, "Only a single database name is currently supported");
++count;
}
}
return count;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.relational.history.KafkaDatabaseHistory;
public class SqlServerConnectorConfigTest {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnectorConfigTest.class);
@Test
public void noDatabaseName() {
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(
defaultConfig().build());
assertTrue(connectorConfig.validateAndRecord(SqlServerConnectorConfig.ALL_FIELDS, LOGGER::error));
}
@Test
public void onlyDatabaseName() {
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(
defaultConfig()
.with(SqlServerConnectorConfig.DATABASE_NAME, "testDB")
.build());
assertTrue(connectorConfig.validateAndRecord(SqlServerConnectorConfig.ALL_FIELDS, LOGGER::error));
}
@Test
public void onlyDatabaseNames() {
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(
defaultConfig()
.with(SqlServerConnectorConfig.DATABASE_NAMES, "testDB")
.build());
assertTrue(connectorConfig.validateAndRecord(SqlServerConnectorConfig.ALL_FIELDS, LOGGER::error));
}
@Test
public void databaseNameAndDatabaseNames() {
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(
defaultConfig()
.with(SqlServerConnectorConfig.DATABASE_NAME, "testDB")
.with(SqlServerConnectorConfig.DATABASE_NAMES, "testDB")
.build());
assertFalse(connectorConfig.validateAndRecord(SqlServerConnectorConfig.ALL_FIELDS, LOGGER::error));
}
@Test
public void multipleDatabaseNames() {
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(
defaultConfig()
.with(SqlServerConnectorConfig.DATABASE_NAMES, "testDB1,testDB2")
.build());
assertFalse(connectorConfig.validateAndRecord(SqlServerConnectorConfig.ALL_FIELDS, LOGGER::error));
}
private Configuration.Builder defaultConfig() {
return Configuration.create()
.with(SqlServerConnectorConfig.SERVER_NAME, "server")
.with(SqlServerConnectorConfig.HOSTNAME, "localhost")
.with(SqlServerConnectorConfig.USER, "debezium")
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "localhost:9092")
.with(KafkaDatabaseHistory.TOPIC, "history");
}
}

View File

@ -982,6 +982,17 @@ default boolean hasKey(String key) {
return getString(key) != null;
}
/**
* Determine whether this configuration contains a key-value pair associated with the given field and the value
* is non-null.
*
* @param field the field; may not be null
* @return true if the configuration contains the key, or false otherwise
*/
default boolean hasKey(Field field) {
return hasKey(field.name());
}
/**
* Get the set of keys in this configuration.
*

View File

@ -182,7 +182,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
.withDescription("The name of the database the connector should be monitoring");
.withDescription("The name of the database from which the connector should capture changes");
public static final Field SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "server.name")
.withDisplayName("Namespace")

View File

@ -2004,6 +2004,15 @@ The following configuration properties are _required_ unless a default value is
|[[sqlserver-property-database-dbname]]<<sqlserver-property-database-dbname, `+database.dbname+`>>
|
|The name of the SQL Server database from which to stream the changes
Must not be used with `database.names`.
|[[sqlserver-property-database-names]]<<sqlserver-property-database-names, `+database.names+`>>
|
|The comma-separated list of the SQL Server database names from which to stream the changes.
Currently, only one database name is supported. Must not be used with `database.dbname`.
This option is *experimental* and must not be used in production. Using it will make the behavior of the connector
incompatible with the default configuration with no upgrade or downgrade path.
|[[sqlserver-property-database-server-name]]<<sqlserver-property-database-server-name, `+database.server.name+`>>
|