DBZ-4346 Adding validation to SQL Server connector to fail when a user not having access to CDC table is used with modes other than INITIAL_ONLY mode

Removing unwanted commits
This commit is contained in:
Sagar Rao 2022-05-02 17:01:38 +05:30 committed by Chris Cranford
parent 0edb86d861
commit e4b1486353
5 changed files with 121 additions and 1 deletions

View File

@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
@ -352,6 +353,19 @@ public Lsn incrementLsn(String databaseName, Lsn lsn) throws SQLException {
}, "Increment LSN query must return exactly one value"));
}
/**
* Check if the user with which connection object is created has
* access to CDC table.
*
* @return boolean indicating the presence/absence of access
* @throws SQLException
*/
public boolean checkIfConnectedUserHasAccessToCDCTable() throws SQLException {
final AtomicBoolean userHasAccess = new AtomicBoolean();
this.query("EXEC sys.sp_cdc_help_change_data_capture", rs -> userHasAccess.set(rs.next()));
return userHasAccess.get();
}
/**
* Creates an exclusive lock for a given table.
*

View File

@ -129,6 +129,14 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
connection.execute("SELECT @@VERSION");
LOGGER.debug("Successfully tested connection for {} with user '{}'", connection.connectionString(),
connection.username());
LOGGER.info("Checking if user has access to CDC table");
boolean userHasAccessToCDCTable = connection.checkIfConnectedUserHasAccessToCDCTable();
if (!userHasAccessToCDCTable
&& sqlServerConfig.getSnapshotMode() != SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY) {
String errorMessage = "User " + userValue.value() + " does not have access to CDC table and can only be used in initial_only snapshot mode";
LOGGER.error(errorMessage);
userValue.addErrorMessage(errorMessage);
}
}
catch (Exception e) {
LOGGER.error("Failed testing connection for {} with user '{}'", config.withMaskedPasswords(),

View File

@ -413,6 +413,48 @@ public void shouldProperlyGetDefaultColumnNullValues() throws Exception {
}
}
@Test
@FixFor("DBZ-4346")
public void testAccessToCDCTableBasedOnUserRoleAccess() throws Exception {
// Setup a user with only read-only access
try (SqlServerConnection connection = TestHelper.adminConnection()) {
connection.connect();
connection.execute("CREATE DATABASE testDB");
connection.execute("USE testDB");
String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n"
+ "DROP LOGIN test_user\n"
+ "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n"
+ "CREATE USER test_user FOR LOGIN test_user\n"
+ "ALTER ROLE db_denydatareader ADD MEMBER test_user";
connection.execute(testUserCreateSql);
// NOTE: you cannot enable CDC on master
TestHelper.enableDbCdc(connection, "testDB");
// create table if exists
String sql = "IF EXISTS (select 1 from sys.objects w" +
"here name = 'testTable' and type = 'u')\n"
+ "DROP TABLE testTable\n"
+ "CREATE TABLE testTable (ID int not null identity(1, 1) primary key, NUMBER int, TEXT text)";
connection.execute(sql);
// then enable CDC and wrapper functions
TestHelper.enableTableCdc(connection, "testTable");
// sa user should have access to CDC table
Assertions.assertThat(connection.checkIfConnectedUserHasAccessToCDCTable()).isTrue();
}
// Re-connect with the newly created user
try (SqlServerConnection connection = TestHelper.testConnection(
TestHelper.jdbcConfig("test_user", "Password!"))) {
// This user shouldn't have access to CDC table
connection.execute("USE testDB");
Assertions.assertThat(connection.checkIfConnectedUserHasAccessToCDCTable()).isFalse();
}
}
private long toMillis(OffsetDateTime datetime) {
return datetime.toInstant().toEpochMilli();
}

View File

@ -31,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -895,6 +896,54 @@ public void testWhitelistTable() throws Exception {
stopConnector();
}
@Test
@FixFor("DBZ-4346")
public void shouldReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialMode() throws Exception {
// First create a new user with only db_datareader role
String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n"
+ "DROP LOGIN test_user\n"
+ "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n"
+ "CREATE USER test_user FOR LOGIN test_user\n"
+ "ALTER ROLE db_denydatareader ADD MEMBER test_user";
connection.execute(testUserCreateSql);
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$")
.with(SqlServerConnectorConfig.USER, "test_user")
.build();
SqlServerConnector connector = new SqlServerConnector();
Config validatedConfig = connector.validate(config.asMap());
assertConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER, 1);
}
@Test
@FixFor("DBZ-4346")
public void shouldNotReportConfigurationErrorForUserNotHavingAccessToCDCTableInInitialOnlyMode() throws Exception {
// First create a new user with only db_datareader role
String testUserCreateSql = "IF EXISTS (select 1 from sys.server_principals where name = 'test_user')\n"
+ "DROP LOGIN test_user\n"
+ "CREATE LOGIN test_user WITH PASSWORD = 'Password!'\n"
+ "CREATE USER test_user FOR LOGIN test_user\n"
+ "ALTER ROLE db_denydatareader ADD MEMBER test_user";
connection.execute(testUserCreateSql);
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$")
.with(SqlServerConnectorConfig.USER, "test_user")
.build();
SqlServerConnector connector = new SqlServerConnector();
Config validatedConfig = connector.validate(config.asMap());
assertNoConfigurationErrors(validatedConfig, SqlServerConnectorConfig.USER);
}
@Test
public void testTableIncludeList() throws Exception {
final int RECORDS_PER_TABLE = 5;

View File

@ -112,6 +112,13 @@ public static JdbcConfiguration defaultJdbcConfig() {
.build();
}
public static JdbcConfiguration jdbcConfig(String user, String password) {
return JdbcConfiguration.copy(defaultJdbcConfig())
.withUser(user)
.withPassword(password)
.build();
}
public static Configuration.Builder defaultConnectorConfig() {
JdbcConfiguration jdbcConfiguration = defaultJdbcConfig();
Configuration.Builder builder = Configuration.create();
@ -260,7 +267,7 @@ public static SqlServerConnection testConnection(String databaseName) {
return testConnection(config);
}
private static SqlServerConnection testConnection(JdbcConfiguration config) {
public static SqlServerConnection testConnection(JdbcConfiguration config) {
return new SqlServerConnection(config, SourceTimestampMode.getDefaultMode(),
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), () -> TestHelper.class.getClassLoader(),
Collections.emptySet(), true);