diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 7682450e5..18a8e6ede 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.stream.Collectors; @@ -352,6 +353,19 @@ public Lsn incrementLsn(String databaseName, Lsn lsn) throws SQLException { }, "Increment LSN query must return exactly one value")); } + /** + * Check if the user with which connection object is created has + * access to CDC table. + * + * @return boolean indicating the presence/absence of access + * @throws SQLException + */ + public boolean checkIfConnectedUserHasAccessToCDCTable() throws SQLException { + final AtomicBoolean userHasAccess = new AtomicBoolean(); + this.query("EXEC sys.sp_cdc_help_change_data_capture", rs -> userHasAccess.set(rs.next())); + return userHasAccess.get(); + } + /** * Creates an exclusive lock for a given table. * diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java index 73b825c65..aa682ce36 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java @@ -129,6 +129,14 @@ protected void validateConnection(Map configValues, Configu connection.execute("SELECT @@VERSION"); LOGGER.debug("Successfully tested connection for {} with user '{}'", connection.connectionString(), connection.username()); + LOGGER.info("Checking if user has access to CDC table"); + boolean userHasAccessToCDCTable = connection.checkIfConnectedUserHasAccessToCDCTable(); + if (!userHasAccessToCDCTable + && sqlServerConfig.getSnapshotMode() != SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY) { + String errorMessage = "User " + userValue.value() + " does not have access to CDC table and can only be used in initial_only snapshot mode"; + LOGGER.error(errorMessage); + userValue.addErrorMessage(errorMessage); + } } catch (Exception e) { LOGGER.error("Failed testing connection for {} with user '{}'", config.withMaskedPasswords(), diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java index 247c2fae2..dbf285b7d 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java @@ -413,6 +413,48 @@ public void shouldProperlyGetDefaultColumnNullValues() throws Exception { } } + @Test + @FixFor("DBZ-4346") + public void testAccessToCDCTableBasedOnUserRoleAccess() throws Exception { + // Setup a user with only read-only access + try (SqlServerConnection connection = TestHelper.adminConnection()) { + connection.connect(); + connection.execute("CREATE DATABASE testDB"); + connection.execute("USE testDB"); + + String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n" + + "DROP LOGIN test_user\n" + + "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n" + + "CREATE USER test_user FOR LOGIN test_user\n" + + "ALTER ROLE db_denydatareader ADD MEMBER test_user"; + + connection.execute(testUserCreateSql); + + // NOTE: you cannot enable CDC on master + TestHelper.enableDbCdc(connection, "testDB"); + + // create table if exists + String sql = "IF EXISTS (select 1 from sys.objects w" + + "here name = 'testTable' and type = 'u')\n" + + "DROP TABLE testTable\n" + + "CREATE TABLE testTable (ID int not null identity(1, 1) primary key, NUMBER int, TEXT text)"; + connection.execute(sql); + // then enable CDC and wrapper functions + TestHelper.enableTableCdc(connection, "testTable"); + + // sa user should have access to CDC table + Assertions.assertThat(connection.checkIfConnectedUserHasAccessToCDCTable()).isTrue(); + } + + // Re-connect with the newly created user + try (SqlServerConnection connection = TestHelper.testConnection( + TestHelper.jdbcConfig("test_user", "Password!"))) { + // This user shouldn't have access to CDC table + connection.execute("USE testDB"); + Assertions.assertThat(connection.checkIfConnectedUserHasAccessToCDCTable()).isFalse(); + } + } + private long toMillis(OffsetDateTime datetime) { return datetime.toInstant().toEpochMilli(); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 002f8b5c3..4197b8909 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.config.Config; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -895,6 +896,54 @@ public void testWhitelistTable() throws Exception { stopConnector(); } + @Test + @FixFor("DBZ-4346") + public void shouldReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialMode() throws Exception { + // First create a new user with only db_datareader role + String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n" + + "DROP LOGIN test_user\n" + + "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n" + + "CREATE USER test_user FOR LOGIN test_user\n" + + "ALTER ROLE db_denydatareader ADD MEMBER test_user"; + + connection.execute(testUserCreateSql); + + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$") + .with(SqlServerConnectorConfig.USER, "test_user") + .build(); + + SqlServerConnector connector = new SqlServerConnector(); + Config validatedConfig = connector.validate(config.asMap()); + + assertConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER, 1); + } + + @Test + @FixFor("DBZ-4346") + public void shouldNotReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialOnlyMode() throws Exception { + // First create a new user with only db_datareader role + String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n" + + "DROP LOGIN test_user\n" + + "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n" + + "CREATE USER test_user FOR LOGIN test_user\n" + + "ALTER ROLE db_denydatareader ADD MEMBER test_user"; + + connection.execute(testUserCreateSql); + + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) + .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$") + .with(SqlServerConnectorConfig.USER, "test_user") + .build(); + + SqlServerConnector connector = new SqlServerConnector(); + Config validatedConfig = connector.validate(config.asMap()); + + assertNoConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER); + } + @Test public void testTableIncludeList() throws Exception { final int RECORDS_PER_TABLE = 5; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index 7ddfa0bdf..cf1ee26ec 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -112,6 +112,13 @@ public static JdbcConfiguration defaultJdbcConfig() { .build(); } + public static JdbcConfiguration jdbcConfig(String user, String password) { + return JdbcConfiguration.copy(defaultJdbcConfig()) + .withUser(user) + .withPassword(password) + .build(); + } + public static Configuration.Builder defaultConnectorConfig() { JdbcConfiguration jdbcConfiguration = defaultJdbcConfig(); Configuration.Builder builder = Configuration.create(); @@ -260,7 +267,7 @@ public static SqlServerConnection testConnection(String databaseName) { return testConnection(config); } - private static SqlServerConnection testConnection(JdbcConfiguration config) { + public static SqlServerConnection testConnection(JdbcConfiguration config) { return new SqlServerConnection(config, SourceTimestampMode.getDefaultMode(), new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), () -> TestHelper.class.getClassLoader(), Collections.emptySet(), true);