DBZ-5924 respect SQL-Server database.instance config option
This commit is contained in:
parent
dd96ccc27d
commit
58eb77d7e4
@ -100,9 +100,11 @@ public class SqlServerConnection extends JdbcConnection {
|
|||||||
private static final String OPENING_QUOTING_CHARACTER = "[";
|
private static final String OPENING_QUOTING_CHARACTER = "[";
|
||||||
private static final String CLOSING_QUOTING_CHARACTER = "]";
|
private static final String CLOSING_QUOTING_CHARACTER = "]";
|
||||||
|
|
||||||
private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}";
|
private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}";
|
||||||
|
|
||||||
|
private final JdbcConfiguration config;
|
||||||
private final boolean useSingleDatabase;
|
private final boolean useSingleDatabase;
|
||||||
|
private final String instanceName;
|
||||||
private final String getAllChangesForTable;
|
private final String getAllChangesForTable;
|
||||||
private final int queryFetchSize;
|
private final int queryFetchSize;
|
||||||
|
|
||||||
@ -124,8 +126,8 @@ public class SqlServerConnection extends JdbcConnection {
|
|||||||
*/
|
*/
|
||||||
public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters,
|
public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters,
|
||||||
Set<Envelope.Operation> skippedOperations,
|
Set<Envelope.Operation> skippedOperations,
|
||||||
boolean useSingleDatabase) {
|
boolean useSingleDatabase, String instanceName) {
|
||||||
super(config, createConnectionFactory(useSingleDatabase), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER);
|
super(config, createConnectionFactory(config, useSingleDatabase, instanceName), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER);
|
||||||
|
|
||||||
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
|
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
|
||||||
this.queryFetchSize = config().getInteger(CommonConnectorConfig.QUERY_FETCH_SIZE);
|
this.queryFetchSize = config().getInteger(CommonConnectorConfig.QUERY_FETCH_SIZE);
|
||||||
@ -161,7 +163,9 @@ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters va
|
|||||||
|
|
||||||
getAllChangesForTable = get_all_changes_for_table.replaceFirst(STATEMENTS_PLACEHOLDER,
|
getAllChangesForTable = get_all_changes_for_table.replaceFirst(STATEMENTS_PLACEHOLDER,
|
||||||
Matcher.quoteReplacement(", " + LSN_TIMESTAMP_SELECT_STATEMENT));
|
Matcher.quoteReplacement(", " + LSN_TIMESTAMP_SELECT_STATEMENT));
|
||||||
|
this.config = config;
|
||||||
this.useSingleDatabase = useSingleDatabase;
|
this.useSingleDatabase = useSingleDatabase;
|
||||||
|
this.instanceName = instanceName;
|
||||||
|
|
||||||
this.optionRecompile = false;
|
this.optionRecompile = false;
|
||||||
}
|
}
|
||||||
@ -176,8 +180,8 @@ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters va
|
|||||||
*/
|
*/
|
||||||
public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters,
|
public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters,
|
||||||
Set<Envelope.Operation> skippedOperations, boolean useSingleDatabase,
|
Set<Envelope.Operation> skippedOperations, boolean useSingleDatabase,
|
||||||
boolean optionRecompile) {
|
String instanceName, boolean optionRecompile) {
|
||||||
this(config, valueConverters, skippedOperations, useSingleDatabase);
|
this(config, valueConverters, skippedOperations, useSingleDatabase, instanceName);
|
||||||
|
|
||||||
this.optionRecompile = optionRecompile;
|
this.optionRecompile = optionRecompile;
|
||||||
}
|
}
|
||||||
@ -196,15 +200,24 @@ private boolean hasSkippedOperations(Set<Envelope.Operation> skippedOperations)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ConnectionFactory createConnectionFactory(boolean useSingleDatabase) {
|
private static ConnectionFactory createConnectionFactory(JdbcConfiguration config, boolean useSingleDatabase, String instanceName) {
|
||||||
return JdbcConnection.patternBasedFactory(createUrlPattern(useSingleDatabase),
|
return JdbcConnection.patternBasedFactory(createUrlPattern(config, useSingleDatabase, instanceName),
|
||||||
SQLServerDriver.class.getName(),
|
SQLServerDriver.class.getName(),
|
||||||
SqlServerConnection.class.getClassLoader(),
|
SqlServerConnection.class.getClassLoader(),
|
||||||
JdbcConfiguration.PORT.withDefault(SqlServerConnectorConfig.PORT.defaultValueAsString()));
|
JdbcConfiguration.PORT.withDefault(SqlServerConnectorConfig.PORT.defaultValueAsString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String createUrlPattern(boolean useSingleDatabase) {
|
private static String createUrlPattern(JdbcConfiguration config, boolean useSingleDatabase, String instanceName) {
|
||||||
String pattern = URL_PATTERN;
|
String pattern = URL_PATTERN;
|
||||||
|
if (instanceName != null) {
|
||||||
|
pattern += "\\" + instanceName;
|
||||||
|
if (config.getPortAsString() != null) {
|
||||||
|
pattern += ":${" + JdbcConfiguration.PORT + "}";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
pattern += ":${" + JdbcConfiguration.PORT + "}";
|
||||||
|
}
|
||||||
if (useSingleDatabase) {
|
if (useSingleDatabase) {
|
||||||
pattern += ";databaseName=${" + JdbcConfiguration.DATABASE + "}";
|
pattern += ";databaseName=${" + JdbcConfiguration.DATABASE + "}";
|
||||||
}
|
}
|
||||||
@ -218,7 +231,7 @@ private static String createUrlPattern(boolean useSingleDatabase) {
|
|||||||
* @return a {@code String} where the variables in {@code urlPattern} are replaced with values from the configuration
|
* @return a {@code String} where the variables in {@code urlPattern} are replaced with values from the configuration
|
||||||
*/
|
*/
|
||||||
public String connectionString() {
|
public String connectionString() {
|
||||||
return connectionString(createUrlPattern(useSingleDatabase));
|
return connectionString(createUrlPattern(config, useSingleDatabase, instanceName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -153,6 +153,6 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
|
|||||||
|
|
||||||
private SqlServerConnection connect(SqlServerConnectorConfig sqlServerConfig) {
|
private SqlServerConnection connect(SqlServerConnectorConfig sqlServerConfig) {
|
||||||
return new SqlServerConnection(sqlServerConfig.getJdbcConfig(), null, Collections.emptySet(),
|
return new SqlServerConnection(sqlServerConfig.getJdbcConfig(), null, Collections.emptySet(),
|
||||||
sqlServerConfig.useSingleDatabase());
|
sqlServerConfig.useSingleDatabase(), sqlServerConfig.getInstanceName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,9 +72,10 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
|
|||||||
|
|
||||||
dataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters,
|
dataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters,
|
||||||
connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(),
|
connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(),
|
||||||
connectorConfig.getOptionRecompile());
|
connectorConfig.getInstanceName(), connectorConfig.getOptionRecompile());
|
||||||
metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters,
|
metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters,
|
||||||
connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase());
|
connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(),
|
||||||
|
connectorConfig.getInstanceName());
|
||||||
|
|
||||||
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicNamingStrategy,
|
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicNamingStrategy,
|
||||||
schemaNameAdjuster);
|
schemaNameAdjuster);
|
||||||
|
@ -238,7 +238,7 @@ private static void dropTestDatabase(SqlServerConnection connection, String data
|
|||||||
public static SqlServerConnection adminConnection() {
|
public static SqlServerConnection adminConnection() {
|
||||||
return new SqlServerConnection(TestHelper.defaultJdbcConfig(),
|
return new SqlServerConnection(TestHelper.defaultJdbcConfig(),
|
||||||
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
|
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
|
||||||
Collections.emptySet(), false);
|
Collections.emptySet(), false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SqlServerConnection testConnection() {
|
public static SqlServerConnection testConnection() {
|
||||||
@ -264,7 +264,7 @@ public static SqlServerConnection testConnection(String databaseName) {
|
|||||||
public static SqlServerConnection testConnection(JdbcConfiguration config) {
|
public static SqlServerConnection testConnection(JdbcConfiguration config) {
|
||||||
return new SqlServerConnection(config,
|
return new SqlServerConnection(config,
|
||||||
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
|
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
|
||||||
Collections.emptySet(), false);
|
Collections.emptySet(), false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static SqlServerConnection testConnectionWithOptionRecompile() {
|
public static SqlServerConnection testConnectionWithOptionRecompile() {
|
||||||
@ -275,7 +275,7 @@ public static SqlServerConnection testConnectionWithOptionRecompile() {
|
|||||||
|
|
||||||
return new SqlServerConnection(config,
|
return new SqlServerConnection(config,
|
||||||
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
|
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
|
||||||
Collections.emptySet(), true);
|
Collections.emptySet(), true, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user