DBZ-2952 Remove Postgres replication slot validation (maybe part 1 to fix user's issues)

This commit is contained in:
rkerner 2021-01-26 14:43:00 +01:00 committed by Gunnar Morling
parent 3a55caf8b5
commit 6aa13e756a
2 changed files with 7 additions and 33 deletions

View File

@ -36,7 +36,7 @@
*/ */
public class PostgresConnector extends SourceConnector { public class PostgresConnector extends SourceConnector {
private Logger logger = LoggerFactory.getLogger(getClass()); private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConnector.class);
private Map<String, String> props; private Map<String, String> props;
public PostgresConnector() { public PostgresConnector() {
@ -91,7 +91,7 @@ public Config validate(Map<String, String> connectorConfigs) {
final String passwordStringValue = config.getConfig().getString(PostgresConnectorConfig.PASSWORD); final String passwordStringValue = config.getConfig().getString(PostgresConnectorConfig.PASSWORD);
if (Strings.isNullOrEmpty(passwordStringValue)) { if (Strings.isNullOrEmpty(passwordStringValue)) {
logger.warn("The connection password is empty"); LOGGER.warn("The connection password is empty");
} }
// If there are no errors on any of these ... // If there are no errors on any of these ...
@ -107,7 +107,7 @@ public Config validate(Map<String, String> connectorConfigs) {
try { try {
// check connection // check connection
connection.execute("SELECT version()"); connection.execute("SELECT version()");
logger.info("Successfully tested connection for {} with user '{}'", connection.connectionString(), LOGGER.info("Successfully tested connection for {} with user '{}'", connection.connectionString(),
connection.username()); connection.username());
// check server wal_level // check server wal_level
final String walLevel = connection.queryAndMap( final String walLevel = connection.queryAndMap(
@ -115,7 +115,7 @@ public Config validate(Map<String, String> connectorConfigs) {
connection.singleResultMapper(rs -> rs.getString("wal_level"), "Could not fetch wal_level")); connection.singleResultMapper(rs -> rs.getString("wal_level"), "Could not fetch wal_level"));
if (!"logical".equals(walLevel)) { if (!"logical".equals(walLevel)) {
final String errorMessage = "Postgres server wal_level property must be \"logical\" but is: " + walLevel; final String errorMessage = "Postgres server wal_level property must be \"logical\" but is: " + walLevel;
logger.error(errorMessage); LOGGER.error(errorMessage);
hostnameResult.addErrorMessage(errorMessage); hostnameResult.addErrorMessage(errorMessage);
} }
// check user for LOGIN and REPLICATION roles // check user for LOGIN and REPLICATION roles
@ -143,27 +143,11 @@ public Config validate(Map<String, String> connectorConfigs) {
|| rs.getBoolean("aws_repladmin")), || rs.getBoolean("aws_repladmin")),
"Could not fetch roles"))) { "Could not fetch roles"))) {
final String errorMessage = "Postgres roles LOGIN and REPLICATION are not assigned to user: " + connection.username(); final String errorMessage = "Postgres roles LOGIN and REPLICATION are not assigned to user: " + connection.username();
logger.error(errorMessage); LOGGER.error(errorMessage);
}
// check replication slot
final String slotName = config.slotName();
if (connection.prepareQueryAndMap(
"SELECT * FROM pg_replication_slots WHERE slot_name = ?",
statement -> statement.setString(1, slotName),
rs -> {
if (rs.next()) {
return rs.getBoolean("active");
}
return false;
})) {
final String errorMessage = "Slot name \"" + slotName
+ "\" already exists and is active. Choose a unique name or stop the other process occupying the slot.";
logger.error(errorMessage);
slotNameResult.addErrorMessage(errorMessage);
} }
} }
catch (SQLException e) { catch (SQLException e) {
logger.error("Failed testing connection for {} with user '{}': {}", connection.connectionString(), LOGGER.error("Failed testing connection for {} with user '{}': {}", connection.connectionString(),
connection.username(), e.getMessage()); connection.username(), e.getMessage());
hostnameResult.addErrorMessage("Error while validating connector config: " + e.getMessage()); hostnameResult.addErrorMessage("Error while validating connector config: " + e.getMessage());
} }

View File

@ -18,11 +18,9 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -31,7 +29,6 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.Config;
@ -182,15 +179,8 @@ public void shouldNotStartWithInvalidSlotConfigAndUserRoles() throws Exception {
.with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME) .with(PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME)
.build(); .build();
List<ConfigValue> validatedConfig = new PostgresConnector().validate(failingConfig.asMap()).configValues(); List<ConfigValue> validatedConfig = new PostgresConnector().validate(failingConfig.asMap()).configValues();
Map<String, ConfigValue> results = validatedConfig.stream().collect(Collectors.toMap(ConfigValue::name, o -> o));
assertFalse("Expected error on \"slot.name\" property did not happen!", results.get("slot.name").errorMessages().isEmpty()); final List<String> invalidProperties = Collections.singletonList("database.user");
assertEquals(
"Slot name \"" + ReplicationConnection.Builder.DEFAULT_SLOT_NAME
+ "\" already exists and is active. Choose a unique name or stop the other process occupying the slot.",
results.get("slot.name").errorMessages().get(0));
final List<String> invalidProperties = Arrays.asList("database.user", "slot.name");
validatedConfig.forEach( validatedConfig.forEach(
configValue -> { configValue -> {
if (!invalidProperties.contains(configValue.name())) { if (!invalidProperties.contains(configValue.name())) {