DBZ-1283 Only validate history topic name for affected connectors

This commit is contained in:
Chris Cranford 2019-05-29 13:54:26 -04:00 committed by Jiri Pechanec
parent 73590173b5
commit b4736a7e03
2 changed files with 36 additions and 5 deletions

View File

@ -626,6 +626,16 @@ public static SchemaRefreshMode parse(String value) {
.withImportance(Importance.MEDIUM)
.withDescription("A name of class to that creates SSL Sockets. Use org.postgresql.ssl.NonValidatingFactory to disable SSL validation in development environments");
public static final Field SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "server.name")
.withDisplayName("Namespace")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withDescription("Unique name that identifies the database server and all "
+ "recorded offsets, and that is used as a prefix for all schemas and topics. "
+ "Each distinct installation should have a separate namespace and be monitored by "
+ "at most one Debezium connector.");
/**
* A comma-separated list of regular expressions that match schema names to be monitored.
* May not be used with {@link #SCHEMA_BLACKLIST}.
@ -805,7 +815,7 @@ public static SchemaRefreshMode parse(String value) {
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, SLOT_NAME, DROP_SLOT_ON_STOP, STREAM_PARAMS,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, RelationalDatabaseConnectorConfig.SERVER_NAME,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_NAME,
TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
CommonConnectorConfig.SNAPSHOT_DELAY_MS, CommonConnectorConfig.SNAPSHOT_FETCH_SIZE,
@ -829,7 +839,7 @@ public static SchemaRefreshMode parse(String value) {
protected PostgresConnectorConfig(Configuration config) {
super(
config,
config.getString(RelationalDatabaseConnectorConfig.SERVER_NAME),
config.getString(SERVER_NAME),
null, // TODO whitelist handling implemented locally here for the time being
null,
DEFAULT_SNAPSHOT_FETCH_SIZE
@ -950,7 +960,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, RelationalDatabaseConnectorConfig.SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
USER, PASSWORD, ON_CONNECT_STATEMENTS, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
DROP_SLOT_ON_STOP, STREAM_PARAMS, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, XMIN_FETCH_INTERVAL, SNAPSHOT_MODE_CLASS);
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,

View File

@ -7,6 +7,8 @@
package io.debezium.connector.postgresql;
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.decoderPlugin;
import static io.debezium.connector.postgresql.TestHelper.defaultJdbcConfig;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
@ -54,8 +56,8 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Strings;
/**
@ -120,6 +122,25 @@ public void shouldValidateMinimalConfiguration() throws Exception {
configValue.errorMessages().isEmpty()));
}
@Test
@FixFor("DBZ-1283")
public void shouldValidateConfigurationWithoutSpecifyingServerName() throws Exception {
JdbcConfiguration jdbcConfiguration = defaultJdbcConfig();
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach((field, value) -> builder.with(PostgresConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
Configuration config = builder
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)
.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, 100)
.with(PostgresConnectorConfig.PLUGIN_NAME, decoderPlugin())
.with(PostgresConnectorConfig.SSL_MODE, PostgresConnectorConfig.SecureConnectionMode.DISABLED)
.build();
Config validateConfig = new PostgresConnector().validate(config.asMap());
validateConfig.configValues().forEach(value -> assertTrue("Unexpected error for: " + value.name(), value.errorMessages().isEmpty()));
}
@Test
public void shouldValidateConfiguration() throws Exception {
// use an empty configuration which should be invalid because of the lack of DB connection details
@ -130,9 +151,9 @@ public void shouldValidateConfiguration() throws Exception {
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.HOSTNAME, 1);
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.USER, 1);
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.DATABASE_NAME, 1);
assertConfigurationErrors(validatedConfig, RelationalDatabaseConnectorConfig.SERVER_NAME, 2);
// validate the non required fields
validateField(validatedConfig, PostgresConnectorConfig.SERVER_NAME, null);
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.DECODERBUFS.getValue());
validateField(validatedConfig, PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME);
validateField(validatedConfig, PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);