DBZ-5496: Improve compatibility with Azure SQL

If the SQL Server connector is configured to capture changes from a
single database, pass it to the JDBC connection string in order to
bypass the limitation of Azure SQL which does not allow switchiing
between databases.

See: https://docs.microsoft.com/en-us/sql/t-sql/language-elements/use-transact-sql?view=sql-server-ver15#arguments
This commit is contained in:
Sergei Morozov 2022-09-14 14:19:27 -07:00 committed by Jiri Pechanec
parent f16d8432da
commit 39f3358747
6 changed files with 64 additions and 13 deletions

View File

@ -102,6 +102,7 @@ public class SqlServerConnection extends JdbcConnection {
private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}";
private final boolean useSingleDatabase;
private final String getAllChangesForTable;
private final int queryFetchSize;
@ -122,8 +123,9 @@ public class SqlServerConnection extends JdbcConnection {
* @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming
*/
public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters,
Set<Envelope.Operation> skippedOperations) {
super(config, createConnectionFactory(), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER);
Set<Envelope.Operation> skippedOperations,
boolean useSingleDatabase) {
super(config, createConnectionFactory(useSingleDatabase), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER);
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
this.queryFetchSize = config().getInteger(CommonConnectorConfig.QUERY_FETCH_SIZE);
@ -159,6 +161,7 @@ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters va
getAllChangesForTable = get_all_changes_for_table.replaceFirst(STATEMENTS_PLACEHOLDER,
Matcher.quoteReplacement(", " + LSN_TIMESTAMP_SELECT_STATEMENT));
this.useSingleDatabase = useSingleDatabase;
this.optionRecompile = false;
}
@ -172,8 +175,9 @@ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters va
* @param optionRecompile Includes query option RECOMPILE on incremental snapshots
*/
public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters,
Set<Envelope.Operation> skippedOperations, boolean optionRecompile) {
this(config, valueConverters, skippedOperations);
Set<Envelope.Operation> skippedOperations, boolean useSingleDatabase,
boolean optionRecompile) {
this(config, valueConverters, skippedOperations, useSingleDatabase);
this.optionRecompile = optionRecompile;
}
@ -192,20 +196,29 @@ private boolean hasSkippedOperations(Set<Envelope.Operation> skippedOperations)
return false;
}
private static ConnectionFactory createConnectionFactory() {
return JdbcConnection.patternBasedFactory(URL_PATTERN,
private static ConnectionFactory createConnectionFactory(boolean useSingleDatabase) {
return JdbcConnection.patternBasedFactory(createUrlPattern(useSingleDatabase),
SQLServerDriver.class.getName(),
SqlServerConnection.class.getClassLoader(),
JdbcConfiguration.PORT.withDefault(SqlServerConnectorConfig.PORT.defaultValueAsString()));
}
private static String createUrlPattern(boolean useSingleDatabase) {
String pattern = URL_PATTERN;
if (useSingleDatabase) {
pattern += ";databaseName=${" + JdbcConfiguration.DATABASE + "}";
}
return pattern;
}
/**
* Returns a JDBC connection string for the current configuration.
*
* @return a {@code String} where the variables in {@code urlPattern} are replaced with values from the configuration
*/
public String connectionString() {
return connectionString(URL_PATTERN);
return connectionString(createUrlPattern(useSingleDatabase));
}
@Override

View File

@ -152,6 +152,7 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
}
private SqlServerConnection connect(SqlServerConnectorConfig sqlServerConfig) {
return new SqlServerConnection(sqlServerConfig.getJdbcConfig(), null, Collections.emptySet());
return new SqlServerConnection(sqlServerConfig.getJdbcConfig(), null, Collections.emptySet(),
sqlServerConfig.useSingleDatabase());
}
}

View File

@ -26,6 +26,7 @@
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@ -376,6 +377,21 @@ public String getInstanceName() {
return instanceName;
}
public boolean useSingleDatabase() {
return this.databaseNames.size() == 1;
}
@Override
public JdbcConfiguration getJdbcConfig() {
JdbcConfiguration config = super.getJdbcConfig();
if (useSingleDatabase()) {
config = JdbcConfiguration.adapt(config.edit()
.with(JdbcConfiguration.DATABASE, databaseNames.get(0))
.build());
}
return config;
}
public SnapshotIsolationMode getSnapshotIsolationMode() {
return this.snapshotIsolationMode;
}

View File

@ -71,9 +71,10 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode());
dataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters,
connectorConfig.getSkippedOperations(), connectorConfig.getOptionRecompile());
connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(),
connectorConfig.getOptionRecompile());
metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters,
connectorConfig.getSkippedOperations());
connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase());
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicNamingStrategy,
schemaNameAdjuster);

View File

@ -457,6 +457,24 @@ public void testAccessToCDCTableBasedOnUserRoleAccess() throws Exception {
}
}
@Test
@FixFor("DBZ-5496")
public void shouldConnectToASingleDatabase() throws Exception {
TestHelper.createTestDatabase();
try (SqlServerConnection connection = TestHelper.testConnection()) {
Assertions.assertThat(connection.connection().getCatalog()).isEqualTo(TestHelper.TEST_DATABASE_1);
}
}
@Test
@FixFor("DBZ-5496")
public void shouldNotConnectToAnyOfMultipleDatabase() throws Exception {
TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
try (SqlServerConnection connection = TestHelper.multiPartitionTestConnection()) {
Assertions.assertThat(connection.connection().getCatalog()).isEqualTo("master");
}
}
private long toMillis(OffsetDateTime datetime) {
return datetime.toInstant().toEpochMilli();
}

View File

@ -237,7 +237,8 @@ private static void dropTestDatabase(SqlServerConnection connection, String data
public static SqlServerConnection adminConnection() {
return new SqlServerConnection(TestHelper.defaultJdbcConfig(),
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), Collections.emptySet());
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
Collections.emptySet(), false);
}
public static SqlServerConnection testConnection() {
@ -262,13 +263,14 @@ public static SqlServerConnection testConnection(String databaseName) {
public static SqlServerConnection testConnection(JdbcConfiguration config) {
return new SqlServerConnection(config,
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), Collections.emptySet());
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
Collections.emptySet(), false);
}
public static SqlServerConnection testConnectionWithOptionRecompile() {
JdbcConfiguration config = JdbcConfiguration.adapt(defaultJdbcConfig()
.edit()
.with(JdbcConfiguration.ON_CONNECT_STATEMENTS, "USE [" + TEST_DATABASE_1 + "]")
.with(JdbcConfiguration.DATABASE, TEST_DATABASE_1)
.build());
return new SqlServerConnection(config,