From 39f33587473b733ec4a880cde5116af96e4a1edd Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Wed, 14 Sep 2022 14:19:27 -0700 Subject: [PATCH] DBZ-5496: Improve compatibility with Azure SQL If the SQL Server connector is configured to capture changes from a single database, pass it to the JDBC connection string in order to bypass the limitation of Azure SQL which does not allow switchiing between databases. See: https://docs.microsoft.com/en-us/sql/t-sql/language-elements/use-transact-sql?view=sql-server-ver15#arguments --- .../sqlserver/SqlServerConnection.java | 27 ++++++++++++++----- .../sqlserver/SqlServerConnector.java | 3 ++- .../sqlserver/SqlServerConnectorConfig.java | 16 +++++++++++ .../sqlserver/SqlServerConnectorTask.java | 5 ++-- .../sqlserver/SqlServerConnectionIT.java | 18 +++++++++++++ .../connector/sqlserver/util/TestHelper.java | 8 +++--- 6 files changed, 64 insertions(+), 13 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index a7e646d8f..c640deb7c 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -102,6 +102,7 @@ public class SqlServerConnection extends JdbcConnection { private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}"; + private final boolean useSingleDatabase; private final String getAllChangesForTable; private final int queryFetchSize; @@ -122,8 +123,9 @@ public class SqlServerConnection extends JdbcConnection { * @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming */ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters, - Set skippedOperations) { - super(config, createConnectionFactory(), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER); + Set skippedOperations, + boolean useSingleDatabase) { + super(config, createConnectionFactory(useSingleDatabase), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER); defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters); this.queryFetchSize = config().getInteger(CommonConnectorConfig.QUERY_FETCH_SIZE); @@ -159,6 +161,7 @@ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters va getAllChangesForTable = get_all_changes_for_table.replaceFirst(STATEMENTS_PLACEHOLDER, Matcher.quoteReplacement(", " + LSN_TIMESTAMP_SELECT_STATEMENT)); + this.useSingleDatabase = useSingleDatabase; this.optionRecompile = false; } @@ -172,8 +175,9 @@ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters va * @param optionRecompile Includes query option RECOMPILE on incremental snapshots */ public SqlServerConnection(JdbcConfiguration config, SqlServerValueConverters valueConverters, - Set skippedOperations, boolean optionRecompile) { - this(config, valueConverters, skippedOperations); + Set skippedOperations, boolean useSingleDatabase, + boolean optionRecompile) { + this(config, valueConverters, skippedOperations, useSingleDatabase); this.optionRecompile = optionRecompile; } @@ -192,20 +196,29 @@ private boolean hasSkippedOperations(Set skippedOperations) return false; } - private static ConnectionFactory createConnectionFactory() { - return JdbcConnection.patternBasedFactory(URL_PATTERN, + private static ConnectionFactory createConnectionFactory(boolean useSingleDatabase) { + return JdbcConnection.patternBasedFactory(createUrlPattern(useSingleDatabase), SQLServerDriver.class.getName(), SqlServerConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(SqlServerConnectorConfig.PORT.defaultValueAsString())); } + private static String createUrlPattern(boolean useSingleDatabase) { + String pattern = URL_PATTERN; + if (useSingleDatabase) { + pattern += ";databaseName=${" + JdbcConfiguration.DATABASE + "}"; + } + + return pattern; + } + /** * Returns a JDBC connection string for the current configuration. * * @return a {@code String} where the variables in {@code urlPattern} are replaced with values from the configuration */ public String connectionString() { - return connectionString(URL_PATTERN); + return connectionString(createUrlPattern(useSingleDatabase)); } @Override diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java index 58ac16f84..11fd675e3 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java @@ -152,6 +152,7 @@ protected Map validateAllFields(Configuration config) { } private SqlServerConnection connect(SqlServerConnectorConfig sqlServerConfig) { - return new SqlServerConnection(sqlServerConfig.getJdbcConfig(), null, Collections.emptySet()); + return new SqlServerConnection(sqlServerConfig.getJdbcConfig(), null, Collections.emptySet(), + sqlServerConfig.useSingleDatabase()); } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 12f3e3aab..3f06a71e0 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -26,6 +26,7 @@ import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.SourceInfoStructMaker; import io.debezium.document.Document; +import io.debezium.jdbc.JdbcConfiguration; import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig; @@ -376,6 +377,21 @@ public String getInstanceName() { return instanceName; } + public boolean useSingleDatabase() { + return this.databaseNames.size() == 1; + } + + @Override + public JdbcConfiguration getJdbcConfig() { + JdbcConfiguration config = super.getJdbcConfig(); + if (useSingleDatabase()) { + config = JdbcConfiguration.adapt(config.edit() + .with(JdbcConfiguration.DATABASE, databaseNames.get(0)) + .build()); + } + return config; + } + public SnapshotIsolationMode getSnapshotIsolationMode() { return this.snapshotIsolationMode; } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 6ddaacfdd..029c7852e 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -71,9 +71,10 @@ public ChangeEventSourceCoordinator connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode()); dataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters, - connectorConfig.getSkippedOperations(), connectorConfig.getOptionRecompile()); + connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(), + connectorConfig.getOptionRecompile()); metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters, - connectorConfig.getSkippedOperations()); + connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase()); this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicNamingStrategy, schemaNameAdjuster); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java index 761dbf5ca..201d0558f 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java @@ -457,6 +457,24 @@ public void testAccessToCDCTableBasedOnUserRoleAccess() throws Exception { } } + @Test + @FixFor("DBZ-5496") + public void shouldConnectToASingleDatabase() throws Exception { + TestHelper.createTestDatabase(); + try (SqlServerConnection connection = TestHelper.testConnection()) { + Assertions.assertThat(connection.connection().getCatalog()).isEqualTo(TestHelper.TEST_DATABASE_1); + } + } + + @Test + @FixFor("DBZ-5496") + public void shouldNotConnectToAnyOfMultipleDatabase() throws Exception { + TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2); + try (SqlServerConnection connection = TestHelper.multiPartitionTestConnection()) { + Assertions.assertThat(connection.connection().getCatalog()).isEqualTo("master"); + } + } + private long toMillis(OffsetDateTime datetime) { return datetime.toInstant().toEpochMilli(); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index bdbefdbe6..b3632e60b 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -237,7 +237,8 @@ private static void dropTestDatabase(SqlServerConnection connection, String data public static SqlServerConnection adminConnection() { return new SqlServerConnection(TestHelper.defaultJdbcConfig(), - new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), Collections.emptySet()); + new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), + Collections.emptySet(), false); } public static SqlServerConnection testConnection() { @@ -262,13 +263,14 @@ public static SqlServerConnection testConnection(String databaseName) { public static SqlServerConnection testConnection(JdbcConfiguration config) { return new SqlServerConnection(config, - new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), Collections.emptySet()); + new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), + Collections.emptySet(), false); } public static SqlServerConnection testConnectionWithOptionRecompile() { JdbcConfiguration config = JdbcConfiguration.adapt(defaultJdbcConfig() .edit() - .with(JdbcConfiguration.ON_CONNECT_STATEMENTS, "USE [" + TEST_DATABASE_1 + "]") + .with(JdbcConfiguration.DATABASE, TEST_DATABASE_1) .build()); return new SqlServerConnection(config,