DBZ-5043 Replace database.server.name with topic.prefix

This commit is contained in:
Vojtech Juranek 2022-08-19 17:01:53 +02:00 committed by Jiri Pechanec
parent 161a7229b6
commit 0823cedf25
46 changed files with 118 additions and 162 deletions

View File

@ -37,7 +37,6 @@
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Collect;
/**
@ -539,9 +538,6 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
+ "Note that the connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only, "
+ "but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field SERVER_ID = Field.create("database.server.id")
.withDisplayName("Cluster ID")
.withType(Type.LONG)
@ -885,7 +881,6 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.excluding(
SCHEMA_INCLUDE_LIST,
SCHEMA_EXCLUDE_LIST,
RelationalDatabaseConnectorConfig.SERVER_NAME,
RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
.type(
@ -893,7 +888,6 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
PORT,
USER,
PASSWORD,
SERVER_NAME,
ON_CONNECT_STATEMENTS,
SERVER_ID,
SERVER_ID_OFFSET,

View File

@ -25,6 +25,7 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Testing;
@ -79,7 +80,7 @@ public void shouldCorrectlyManageRollback() throws SQLException, InterruptedExce
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, DATABASE.getServerName())
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
@ -147,7 +148,7 @@ public void shouldProcessSavepoint() throws SQLException, InterruptedException {
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, DATABASE.getServerName())
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
@ -208,7 +209,7 @@ public void shouldProcessLargeTransaction() throws SQLException, InterruptedExce
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, DATABASE.getServerName())
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
@ -279,7 +280,7 @@ public void shouldProcessRolledBackSavepoint() throws SQLException, InterruptedE
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, DATABASE.getServerName())
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)

View File

@ -54,6 +54,7 @@
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.schema.DatabaseSchema;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
@ -103,7 +104,7 @@ public void afterEach() {
@Test
public void shouldNotStartWithInvalidConfiguration() {
config = Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(KafkaDatabaseHistory.TOPIC, "myserver")
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
@ -130,7 +131,7 @@ public void shouldFailToValidateInvalidConfiguration() {
assertConfigurationErrors(result, MySqlConnectorConfig.HOSTNAME, 1);
assertNoConfigurationErrors(result, MySqlConnectorConfig.PORT);
assertConfigurationErrors(result, MySqlConnectorConfig.USER, 1);
assertConfigurationErrors(result, MySqlConnectorConfig.SERVER_NAME, 2);
assertNoConfigurationErrors(result, AbstractTopicNamingStrategy.TOPIC_PREFIX);
assertConfigurationErrors(result, MySqlConnectorConfig.SERVER_ID);
assertNoConfigurationErrors(result, MySqlConnectorConfig.TABLES_IGNORE_BUILTIN);
assertNoConfigurationErrors(result, MySqlConnectorConfig.DATABASE_INCLUDE_LIST);
@ -163,7 +164,7 @@ public void shouldFailToValidateInvalidConfiguration() {
public void shouldValidateAcceptableConfiguration() {
Configuration config = DATABASE.defaultJdbcConfigBuilder()
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "myServer")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myServer")
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")
.with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
@ -177,7 +178,7 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.USER);
assertNoConfigurationErrors(result, MySqlConnectorConfig.PASSWORD);
assertNoConfigurationErrors(result, MySqlConnectorConfig.ON_CONNECT_STATEMENTS);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SERVER_NAME);
assertNoConfigurationErrors(result, AbstractTopicNamingStrategy.TOPIC_PREFIX);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SERVER_ID);
assertNoConfigurationErrors(result, MySqlConnectorConfig.TABLES_IGNORE_BUILTIN);
assertNoConfigurationErrors(result, MySqlConnectorConfig.DATABASE_INCLUDE_LIST);
@ -225,7 +226,7 @@ public void shouldValidateLockingModeNoneWithValidSnapshotModeConfiguration() {
for (final String acceptableValue : acceptableValues) {
Configuration config = DATABASE.defaultJdbcConfigBuilder()
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "myServer")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myServer")
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")
.with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
@ -282,7 +283,7 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))
.with(MySqlConnectorConfig.SERVER_ID, serverId)
.with(MySqlConnectorConfig.SERVER_NAME, DATABASE.getServerName())
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(dbIncludeListField, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
@ -600,9 +601,9 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl
stopConnector();
// Read the last committed offsets, and verify the binlog coordinates ...
final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
final String serverName = config.getString(AbstractTopicNamingStrategy.TOPIC_PREFIX);
final MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, serverName)
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, serverName)
.build()));
final Map<String, String> partition = new MySqlPartition(serverName, DATABASE.getDatabaseName()).getSourcePartition();
Map<String, ?> lastCommittedOffset = readLastCommittedOffset(config, partition);
@ -693,7 +694,7 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))
.with(MySqlConnectorConfig.SERVER_ID, 28765)
.with(MySqlConnectorConfig.SERVER_NAME, DATABASE.getServerName())
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.getDatabaseName() + ".products")
@ -741,7 +742,7 @@ public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() thro
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))
.with(MySqlConnectorConfig.SERVER_ID, 28765)
.with(MySqlConnectorConfig.SERVER_NAME, DATABASE.getServerName())
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, DATABASE.getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)

View File

@ -28,6 +28,7 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.util.ContainerImageVersions;
import io.debezium.util.Testing;
@ -75,7 +76,7 @@ public void afterEach() {
public Configuration.Builder defaultConfig() {
return Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, "myServer1")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myServer1")
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname", "localhost"))
.with(CommonConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.PORT, mySQLContainer.getMappedPort(3306))
.with(MySqlConnectorConfig.USER, "debezium")

View File

@ -27,6 +27,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.document.Document;
import io.debezium.schema.AbstractTopicNamingStrategy;
public class SourceInfoTest {
@ -45,7 +46,7 @@ public class SourceInfoTest {
@Before
public void beforeEach() {
offsetContext = MySqlOffsetContext.initial(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, "server")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "server")
.build()));
source = offsetContext.getSource();
inTxn = false;
@ -445,7 +446,7 @@ protected Map<String, String> offset(String gtidSet, long position, int row, boo
protected SourceInfo sourceWith(Map<String, String> offset) {
offsetContext = (MySqlOffsetContext) new MySqlOffsetContext.Loader(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, SERVER_NAME)
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, SERVER_NAME)
.build())).load(offset);
source = offsetContext.getSource();
source.databaseEvent("mysql");

View File

@ -24,6 +24,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.file.history.FileDatabaseHistory;
/**
@ -128,6 +129,10 @@ public String getServerName() {
return serverName;
}
public String getTopicPrefix() {
return serverName;
}
/**
* Creates the database and populates it with initialization SQL script. To use multiline
* statements for stored procedures definition use delimiter $$ to delimit statements in the procedure.
@ -229,10 +234,10 @@ public Configuration.Builder defaultConfig() {
public Configuration.Builder defaultConfigWithoutDatabaseFilter() {
return defaultJdbcConfigBuilder()
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, getServerName())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10_000);
.with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10_000)
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, getServerName());
}
/**

View File

@ -39,9 +39,9 @@
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
@ -88,7 +88,7 @@ public void beforeEach() throws Exception {
MySqlPartition source = new MySqlPartition("my-server", "my-db");
Configuration config = Configuration.empty()
.edit()
.with(RelationalDatabaseConnectorConfig.SERVER_NAME, "dbserver1").build();
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "dbserver1").build();
position = new MySqlOffsetContext(false, true, new TransactionContext(), new MySqlReadOnlyIncrementalSnapshotContext<>(),
new SourceInfo(new MySqlConnectorConfig(config)));

View File

@ -45,7 +45,6 @@
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Strings;
/**
@ -133,9 +132,6 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
+ "locks entirely which can be done by specifying 'none'. This mode is only safe to use if no schema changes are happening while the "
+ "snapshot is taken.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field CONNECTOR_ADAPTER = Field.create(DATABASE_CONFIG_PREFIX + "connection.adapter")
.withDisplayName("Connector adapter")
.withEnum(ConnectorAdapter.class, ConnectorAdapter.LOG_MINER)
@ -470,14 +466,12 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.excluding(
SCHEMA_INCLUDE_LIST,
SCHEMA_EXCLUDE_LIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
SERVER_NAME)
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
.type(
HOSTNAME,
PORT,
USER,
PASSWORD,
SERVER_NAME,
DATABASE_NAME,
PDB_NAME,
XSTREAM_SERVER_NAME,

View File

@ -20,6 +20,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.doc.FixFor;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
public class OracleConnectorConfigTest {
@ -31,7 +32,7 @@ public void validXtreamNoUrl() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(OracleConnectorConfig.HOSTNAME, "MyHostname")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.XSTREAM_SERVER_NAME, "myserver")
@ -48,7 +49,7 @@ public void validLogminerNoUrl() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.CONNECTOR_ADAPTER, "logminer")
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(OracleConnectorConfig.HOSTNAME, "MyHostname")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.USER, "debezium")
@ -63,7 +64,7 @@ public void validXtreamWithUrl() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(OracleConnectorConfig.URL, "jdbc:oci:thin:@myserver/mydatabase")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.XSTREAM_SERVER_NAME, "myserver")
@ -80,7 +81,7 @@ public void validLogminerWithUrl() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.CONNECTOR_ADAPTER, "logminer")
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(OracleConnectorConfig.URL, "MyHostname")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.USER, "debezium")
@ -96,7 +97,7 @@ public void validUrlTNS() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.CONNECTOR_ADAPTER, "logminer")
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(OracleConnectorConfig.URL,
"jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.11)(PORT=1701))(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.12)(PORT=1701))(ADDRESS=(PROTOCOL=TCP)(HOST=192.68.1.13)(PORT=1701))(LOAD_BALANCE = yes)(FAILOVER = on)(CONNECT_DATA =(SERVER = DEDICATED)(SERVICE_NAME = myserver.mydomain.com)(FAILOVER_MODE =(TYPE = SELECT)(METHOD = BASIC)(RETRIES = 3)(DELAY = 5))))")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
@ -113,7 +114,7 @@ public void invalidNoHostnameNoUri() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.CONNECTOR_ADAPTER, "logminer")
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.with(OracleConnectorConfig.USER, "debezium")
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "localhost:9092")
@ -127,7 +128,7 @@ public void validBatchDefaults() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.build());
assertEquals(connectorConfig.getLogMiningBatchSizeDefault(), OracleConnectorConfig.DEFAULT_BATCH_SIZE);
@ -140,7 +141,7 @@ public void validSleepDefaults() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.build());
assertEquals(connectorConfig.getLogMiningSleepTimeDefault(), OracleConnectorConfig.DEFAULT_SLEEP_TIME);
@ -154,7 +155,7 @@ public void validSleepDefaults() throws Exception {
public void validQueryFetchSizeDefaults() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.build());
assertEquals(connectorConfig.getQueryFetchSize(), 0);
}
@ -164,7 +165,7 @@ public void validQueryFetchSizeDefaults() throws Exception {
public void validQueryFetchSizeAvailable() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(OracleConnectorConfig.QUERY_FETCH_SIZE, 10_000)
.build());
assertEquals(connectorConfig.getQueryFetchSize(), 10_000);
@ -174,7 +175,7 @@ public void validQueryFetchSizeAvailable() throws Exception {
@FixFor("DBZ-2754")
public void validTransactionRetentionDefaults() throws Exception {
final Configuration config = Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.build();
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
assertThat(connectorConfig.getLogMiningTransactionRetention()).isEqualTo(Duration.ZERO);
@ -186,7 +187,7 @@ public void testTransactionRetention() throws Exception {
final Field transactionRetentionField = OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION;
Configuration config = Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(transactionRetentionField, 3)
.build();
@ -208,7 +209,7 @@ public void testSnapshotLockMode() throws Exception {
final Field snapshotLockMode = OracleConnectorConfig.SNAPSHOT_LOCKING_MODE;
Configuration config = Configuration.create().with(snapshotLockMode, "shared")
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.build();
assertThat(config.validateAndRecord(Collections.singletonList(snapshotLockMode), LOGGER::error)).isTrue();
@ -216,7 +217,7 @@ public void testSnapshotLockMode() throws Exception {
assertThat(connectorConfig.getSnapshotLockingMode().usesLocking()).isTrue();
config = Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(snapshotLockMode, "none")
.build();
@ -234,7 +235,7 @@ public void testDatabasePortAndRacNodeConfigurations() throws Exception {
// Test backward compatibility of rac.nodes using no port with database.port
Configuration config = Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(port, "1521")
.with(racNodes, "1.2.3.4,1.2.3.5")
.build();
@ -247,7 +248,7 @@ public void testDatabasePortAndRacNodeConfigurations() throws Exception {
// Test rac.nodes using combination of with/without port with database.port
config = Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(port, "1521")
.with(racNodes, "1.2.3.4,1.2.3.5:1522")
.build();
@ -259,7 +260,7 @@ public void testDatabasePortAndRacNodeConfigurations() throws Exception {
// Test rac.nodes using different ports with no database.port
config = Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(racNodes, "1.2.3.4:1523,1.2.3.5:1522")
.build();
@ -271,7 +272,7 @@ public void testDatabasePortAndRacNodeConfigurations() throws Exception {
// Test rac.nodes using different ports that differ from database.port
config = Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "myserver")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "myserver")
.with(port, "1521")
.with(racNodes, "1.2.3.4:1523,1.2.3.5:1522")
.build();

View File

@ -20,6 +20,7 @@
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.schema.AbstractTopicNamingStrategy;
public class OracleConnectorTest {
OracleConnector connector;
@ -32,7 +33,7 @@ public void before() {
@Test
public void testValidateUnableToConnectNoThrow() {
Map<String, String> config = new HashMap<>();
config.put(OracleConnectorConfig.SERVER_NAME.name(), "dbserver1");
config.put(AbstractTopicNamingStrategy.TOPIC_PREFIX.name(), "dbserver1");
config.put(OracleConnectorConfig.HOSTNAME.name(), "narnia");
config.put(OracleConnectorConfig.PORT.name(), "4321");
config.put(OracleConnectorConfig.DATABASE_NAME.name(), "oracle");

View File

@ -18,13 +18,13 @@
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.AbstractDatabaseHistoryTest;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.AbstractTopicNamingStrategy;
import oracle.jdbc.OracleTypes;
@ -73,7 +73,7 @@ protected Offsets<Partition, OffsetContext> getOffsets() {
final OraclePartition source = new OraclePartition(TestHelper.SERVER_NAME, TestHelper.getDatabaseName());
final Configuration config = Configuration.empty()
.edit()
.with(RelationalDatabaseConnectorConfig.SERVER_NAME, TestHelper.SERVER_NAME)
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, TestHelper.SERVER_NAME)
.build();
final OracleOffsetContext position = new OracleOffsetContext(new OracleConnectorConfig(config), Scn.valueOf(999),
CommitScn.valueOf(999L), null, Scn.valueOf(999), Collections.emptyMap(), false, true, new TransactionContext(),

View File

@ -18,6 +18,7 @@
import io.debezium.connector.AbstractSourceInfoStructMaker;
import io.debezium.data.VerifyRecord;
import io.debezium.relational.TableId;
import io.debezium.schema.AbstractTopicNamingStrategy;
public class SourceInfoTest {
@ -27,7 +28,7 @@ public class SourceInfoTest {
public void beforeEach() {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "serverX")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "serverX")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.build());
source = new SourceInfo(connectorConfig);

View File

@ -25,6 +25,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
@ -160,7 +161,7 @@ public static Configuration.Builder defaultConfig() {
builder.withDefault(OracleConnectorConfig.PDB_NAME, DATABASE);
}
return builder.with(OracleConnectorConfig.SERVER_NAME, SERVER_NAME)
return builder.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, SERVER_NAME)
.with(OracleConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
@ -230,7 +231,7 @@ private static Configuration.Builder testConfig() {
jdbcConfiguration.forEach(
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
builder.with(OracleConnectorConfig.SERVER_NAME, SERVER_NAME);
builder.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, SERVER_NAME);
return builder;
}
@ -244,7 +245,7 @@ private static Configuration.Builder adminConfig() {
jdbcConfiguration.forEach(
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
builder.with(OracleConnectorConfig.SERVER_NAME, SERVER_NAME);
builder.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, SERVER_NAME);
return builder;
}

View File

@ -93,6 +93,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
@ -214,7 +215,7 @@ public void shouldValidateConfiguration() throws Exception {
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.HOSTNAME, 1);
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.USER, 1);
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.DATABASE_NAME, 1);
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.SERVER_NAME, 1);
assertNoConfigurationErrors(validatedConfig, AbstractTopicNamingStrategy.TOPIC_PREFIX);
// validate the non required fields
validateConfigField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.DECODERBUFS.getValue());

View File

@ -14,13 +14,14 @@
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.schema.AbstractTopicNamingStrategy;
public class PostgresErrorHandlerTest {
private static final String A_CLASSIFIED_EXCEPTION = "Database connection failed when writing to copy";
private final PostgresErrorHandler errorHandler = new PostgresErrorHandler(
new PostgresConnectorConfig(Configuration.create()
.with(PostgresConnectorConfig.SERVER_NAME, "postgres")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "postgres")
.build()),
new ChangeEventQueue.Builder<DataChangeEvent>().build());

View File

@ -17,6 +17,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.relational.TableId;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.time.Conversions;
/**
@ -31,7 +32,7 @@ public class SourceInfoTest {
public void beforeEach() {
source = new SourceInfo(new PostgresConnectorConfig(
Configuration.create()
.with(PostgresConnectorConfig.SERVER_NAME, "serverX")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "serverX")
.with(PostgresConnectorConfig.DATABASE_NAME, "serverX")
.build()));
source.update(Conversions.toInstantFromMicros(123_456_789L), new TableId("catalogNameX", "schemaNameX", "tableNameX"));

View File

@ -39,7 +39,7 @@
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Throwables;
@ -257,7 +257,7 @@ protected static Set<String> schemaNames() throws SQLException {
public static JdbcConfiguration defaultJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.with(RelationalDatabaseConnectorConfig.SERVER_NAME, "dbserver1")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "dbserver1")
.withDefault(JdbcConfiguration.DATABASE, "postgres")
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 5432)
@ -270,7 +270,7 @@ public static Configuration.Builder defaultConfig() {
JdbcConfiguration jdbcConfiguration = defaultJdbcConfig();
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach((field, value) -> builder.with(PostgresConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
builder.with(RelationalDatabaseConnectorConfig.SERVER_NAME, TEST_SERVER)
builder.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, TEST_SERVER)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)
.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, 100)
.with(PostgresConnectorConfig.PLUGIN_NAME, decoderPlugin())
@ -310,7 +310,7 @@ public static int waitTimeForRecords() {
protected static SourceInfo sourceInfo() {
return new SourceInfo(new PostgresConnectorConfig(
Configuration.create()
.with(PostgresConnectorConfig.SERVER_NAME, TEST_SERVER)
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, TEST_SERVER)
.with(PostgresConnectorConfig.DATABASE_NAME, TEST_DATABASE)
.build()));
}

View File

@ -32,7 +32,6 @@
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
/**
* The list of configuration options for SQL Server connector
@ -222,9 +221,6 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
public static final Field PORT = RelationalDatabaseConnectorConfig.PORT
.withDefault(DEFAULT_PORT);
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(KafkaStorageConfiguration::validateServerNameIsDifferentFromHistoryTopicName);
public static final Field INSTANCE = Field.create(DATABASE_CONFIG_PREFIX + SqlServerConnection.INSTANCE_NAME)
.withDisplayName("Instance name")
.withType(Type.STRING)

View File

@ -18,6 +18,7 @@
import io.debezium.connector.AbstractSourceInfoStructMaker;
import io.debezium.connector.SnapshotRecord;
import io.debezium.relational.TableId;
import io.debezium.schema.AbstractTopicNamingStrategy;
public class SourceInfoTest {
@ -27,7 +28,7 @@ public class SourceInfoTest {
public void beforeEach() {
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(
Configuration.create()
.with(SqlServerConnectorConfig.SERVER_NAME, "serverX")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "serverX")
.build());
source = new SourceInfo(connectorConfig);
source.setChangeLsn(Lsn.valueOf(new byte[]{ 0x01 }));

View File

@ -13,6 +13,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
public class SqlServerConnectorConfigTest {
@ -37,7 +38,7 @@ public void nonEmptyDatabaseNames() {
private Configuration.Builder defaultConfig() {
return Configuration.create()
.with(SqlServerConnectorConfig.SERVER_NAME, "server")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "server")
.with(SqlServerConnectorConfig.HOSTNAME, "localhost")
.with(SqlServerConnectorConfig.USER, "debezium")
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "localhost:9092")

View File

@ -21,6 +21,7 @@
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.schema.AbstractTopicNamingStrategy;
public class SqlServerConnectorTest {
SqlServerConnector connector;
@ -33,7 +34,7 @@ public void before() {
@Test
public void testValidateUnableToConnectNoThrow() {
Map<String, String> config = new HashMap<>();
config.put(SqlServerConnectorConfig.SERVER_NAME.name(), "dbserver1");
config.put(AbstractTopicNamingStrategy.TOPIC_PREFIX.name(), "dbserver1");
config.put(SqlServerConnectorConfig.HOSTNAME.name(), "narnia");
config.put(SqlServerConnectorConfig.PORT.name(), "4321");
config.put(SqlServerConnectorConfig.DATABASE_NAMES.name(), "sqlserver");

View File

@ -41,6 +41,7 @@
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
@ -124,7 +125,7 @@ public static Configuration.Builder defaultConnectorConfig() {
jdbcConfiguration.forEach(
(field, value) -> builder.with(SqlServerConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
return builder.with(RelationalDatabaseConnectorConfig.SERVER_NAME, "server1")
return builder.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "server1")
.with(SqlServerConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);

View File

@ -16,6 +16,7 @@
import io.debezium.config.Configuration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.util.Strings;
/**
@ -33,7 +34,7 @@ public Config validate(Map<String, String> connectorConfigs) {
// Validate all of the individual fields, which is easy since don't make any of the fields invisible ...
Map<String, ConfigValue> results = validateAllFields(config);
ConfigValue logicalName = results.get(RelationalDatabaseConnectorConfig.SERVER_NAME.name());
ConfigValue logicalName = results.get(AbstractTopicNamingStrategy.TOPIC_PREFIX.name());
// Get the config values for each of the connection-related fields ...
ConfigValue hostnameValue = results.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
ConfigValue portValue = results.get(RelationalDatabaseConnectorConfig.PORT.name());

View File

@ -37,6 +37,7 @@
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.ColumnNameFilterFactory;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
@ -189,19 +190,6 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
.required()
.withDescription("The name of the database from which the connector should capture changes");
public static final Field SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "server.name")
.withDisplayName("Namespace")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.required()
.withValidation(RelationalDatabaseConnectorConfig::validateServerName)
.withDescription("Unique name that identifies the database server and all "
+ "recorded offsets, and that is used as a prefix for all schemas and topics. "
+ "Each distinct installation should have a separate namespace and be monitored by "
+ "at most one Debezium connector.");
/**
* A comma-separated list of regular expressions that match the fully-qualified names of tables to be monitored.
* Fully-qualified names for tables are of the form {@code <databaseName>.<tableName>} or
@ -479,7 +467,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
protected static final ConfigDefinition CONFIG_DEFINITION = CommonConnectorConfig.CONFIG_DEFINITION.edit()
.type(
SERVER_NAME)
AbstractTopicNamingStrategy.TOPIC_PREFIX)
.connector(
DECIMAL_HANDLING_MODE,
TIME_PRECISION_MODE,
@ -741,16 +729,4 @@ private static int validateMessageKeyColumnsField(Configuration config, Field fi
}
return problemCount;
}
private static int validateServerName(Configuration config, Field field, Field.ValidationOutput problems) {
String serverName = config.getString(SERVER_NAME);
if (serverName != null) {
if (!SERVER_NAME_PATTERN.asPredicate().test(serverName)) {
problems.accept(SERVER_NAME, serverName, serverName + " has invalid format (only the underscore, hyphen, dot and alphanumeric characters are allowed)");
return 1;
}
}
return 0;
}
}

View File

@ -8,7 +8,7 @@
import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_EXCLUDE_LIST;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_INCLUDE_LIST;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SERVER_NAME;
import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_PREFIX;
import static org.fest.assertions.Assertions.assertThat;
import java.util.List;
@ -307,18 +307,18 @@ public void testMsgKeyColumnsFieldRegexValidation() {
@FixFor("DBZ-3427")
public void testServerNameValidation() {
List<String> errorList;
config = Configuration.create().with(SERVER_NAME, "server_11").build();
assertThat(config.validate(Field.setOf(SERVER_NAME)).get(SERVER_NAME.name()).errorMessages()).isEmpty();
config = Configuration.create().with(TOPIC_PREFIX, "server_11").build();
assertThat(config.validate(Field.setOf(TOPIC_PREFIX)).get(TOPIC_PREFIX.name()).errorMessages()).isEmpty();
config = Configuration.create().with(SERVER_NAME, "server-12").build();
assertThat(config.validate(Field.setOf(SERVER_NAME)).get(SERVER_NAME.name()).errorMessages()).isEmpty();
config = Configuration.create().with(TOPIC_PREFIX, "server-12").build();
assertThat(config.validate(Field.setOf(TOPIC_PREFIX)).get(TOPIC_PREFIX.name()).errorMessages()).isEmpty();
config = Configuration.create().with(SERVER_NAME, "server.12").build();
assertThat(config.validate(Field.setOf(SERVER_NAME)).get(SERVER_NAME.name()).errorMessages()).isEmpty();
config = Configuration.create().with(TOPIC_PREFIX, "server.12").build();
assertThat(config.validate(Field.setOf(TOPIC_PREFIX)).get(TOPIC_PREFIX.name()).errorMessages()).isEmpty();
config = Configuration.create().with(SERVER_NAME, "server@X").build();
errorList = config.validate(Field.setOf(SERVER_NAME)).get(SERVER_NAME.name()).errorMessages();
config = Configuration.create().with(TOPIC_PREFIX, "server@X").build();
errorList = config.validate(Field.setOf(TOPIC_PREFIX)).get(TOPIC_PREFIX.name()).errorMessages();
assertThat(errorList.get(0))
.isEqualTo(Field.validationOutput(SERVER_NAME, "server@X has invalid format (only the underscore, hyphen, dot and alphanumeric characters are allowed)"));
.isEqualTo(Field.validationOutput(TOPIC_PREFIX, "server@X has invalid format (only the underscore, hyphen, dot and alphanumeric characters are allowed)"));
}
}

View File

@ -174,7 +174,7 @@ def main(argv):
print('tpc-connector deleted')
pass
databaseservername = config['config']['database.server.name']
databaseservername = config['config']['topic.prefix']
topicname = databaseservername + '.' + table
historybootstrapserver = config['config'].get('schema.history.kafka.bootstrap.servers')
if historybootstrapserver != None:

View File

@ -52,6 +52,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.storage.file.history.FileDatabaseHistory;
import io.debezium.util.IoUtil;
@ -235,7 +236,7 @@ private Configuration.Builder defaultConnectorConfig() {
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach((f, v) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + f, v));
return builder.with(OracleConnectorConfig.SERVER_NAME, SERVER_NAME)
return builder.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, SERVER_NAME)
.with(OracleConnectorConfig.PDB_NAME, "ORCLPDB1")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(OracleConnectorConfig.CONNECTOR_ADAPTER, ConnectorAdapter.LOG_MINER)

View File

@ -36,7 +36,7 @@ public TestConfigSource() {
integrationTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
integrationTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
integrationTest.put("debezium.source.offset.flush.interval.ms", "0");
integrationTest.put("debezium.source.database.server.name", "testc");
integrationTest.put("debezium.source.topic.prefix", "testc");
integrationTest.put("debezium.source.schema.include.list", "inventory");
integrationTest.put("debezium.source.table.include.list", "inventory.customers");

View File

@ -8,6 +8,6 @@ debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory
quarkus.log.console.json=false

View File

@ -33,7 +33,7 @@ public EventHubsTestConfigSource() {
eventHubsTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
eventHubsTest.put("debezium.source.offset.flush.interval.ms", "0");
eventHubsTest.put("debezium.source.database.server.name", "testc");
eventHubsTest.put("debezium.source.topic.prefix", "testc");
eventHubsTest.put("debezium.source.schema.include.list", "inventory");
eventHubsTest.put("debezium.source.table.include.list", "inventory.customers");

View File

@ -23,7 +23,7 @@ public HttpTestConfigSource() {
httpTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
httpTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
httpTest.put("debezium.source.offset.flush.interval.ms", "0");
httpTest.put("debezium.source.database.server.name", "testc");
httpTest.put("debezium.source.topic.prefix", "testc");
httpTest.put("debezium.source.schema.include.list", "inventory");
httpTest.put("debezium.source.table.include.list", "inventory.customers");

View File

@ -26,7 +26,7 @@ public KafkaTestConfigSource() {
kafkaConfig.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
kafkaConfig.put("debezium.source.offset.flush.interval.ms", "0");
kafkaConfig.put("debezium.source.database.server.name", "testc");
kafkaConfig.put("debezium.source.topic.prefix", "testc");
kafkaConfig.put("debezium.source.schema.include.list", "inventory");
kafkaConfig.put("debezium.source.table.include.list", "inventory.customers");
// DBZ-5105

View File

@ -24,7 +24,7 @@ public KinesisTestConfigSource() {
kinesisTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
kinesisTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
kinesisTest.put("debezium.source.offset.flush.interval.ms", "0");
kinesisTest.put("debezium.source.database.server.name", "testc");
kinesisTest.put("debezium.source.topic.prefix", "testc");
kinesisTest.put("debezium.source.schema.include.list", "inventory");
kinesisTest.put("debezium.source.table.include.list", "inventory.customers");

View File

@ -23,7 +23,7 @@ public NatsStreamingTestConfigSource() {
natsStreamingTest.put("debezium.sink.nats-streaming.cluster.id", "debezium");
natsStreamingTest.put("debezium.sink.nats-streaming.client.id", "debezium-sink");
natsStreamingTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
natsStreamingTest.put("debezium.source.database.server.name", "testc");
natsStreamingTest.put("debezium.source.topic.prefix", "testc");
natsStreamingTest.put("debezium.source.schema.include.list", "inventory");
natsStreamingTest.put("debezium.source.table.include.list", "inventory.customers");
natsStreamingTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,

View File

@ -22,7 +22,7 @@ public PubSubLiteTestConfigSource() {
pubsubLiteTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
pubsubLiteTest.put("debezium.source.offset.flush.interval.ms", "0");
pubsubLiteTest.put("debezium.source.database.server.name", "testc");
pubsubLiteTest.put("debezium.source.topic.prefix", "testc");
pubsubLiteTest.put("debezium.source.schema.include.list", "inventory");
pubsubLiteTest.put("debezium.source.table.include.list", "inventory.customers");

View File

@ -22,7 +22,7 @@ public PubSubTestConfigSource() {
pubsubTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
pubsubTest.put("debezium.source.offset.flush.interval.ms", "0");
pubsubTest.put("debezium.source.database.server.name", "testc");
pubsubTest.put("debezium.source.topic.prefix", "testc");
pubsubTest.put("debezium.source.schema.include.list", "inventory");
pubsubTest.put("debezium.source.table.include.list", "inventory.customers");

View File

@ -22,7 +22,7 @@ public PulsarTestConfigSource() {
pulsarTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
pulsarTest.put("debezium.source.offset.flush.interval.ms", "0");
pulsarTest.put("debezium.source.database.server.name", "testc");
pulsarTest.put("debezium.source.topic.prefix", "testc");
pulsarTest.put("debezium.source.schema.include.list", "inventory");
pulsarTest.put("debezium.source.table.include.list", "inventory.customers,inventory.nokey");

View File

@ -49,7 +49,7 @@ public Map<String, String> start() {
params.put("debezium.sink.redis.ssl.enabled", "true");
params.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
params.put("debezium.source.offset.flush.interval.ms", "0");
params.put("debezium.source.database.server.name", "testc");
params.put("debezium.source.topic.prefix", "testc");
params.put("debezium.source.schema.include.list", "inventory");
params.put("debezium.source.table.include.list", "inventory.customers,inventory.redis_test,inventory.redis_test2");

View File

@ -42,7 +42,7 @@ public Map<String, String> start() {
params.put("debezium.sink.redis.address", RedisTestResourceLifecycleManager.getRedisContainerAddress());
params.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
params.put("debezium.source.offset.flush.interval.ms", "0");
params.put("debezium.source.database.server.name", "testc");
params.put("debezium.source.topic.prefix", "testc");
params.put("debezium.source.schema.include.list", "inventory");
params.put("debezium.source.table.include.list", "inventory.customers,inventory.redis_test,inventory.redis_test2");

View File

@ -1,26 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.kafka.history;
import java.util.Objects;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
public class KafkaStorageConfiguration {
public static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, Field.ValidationOutput problems) {
String serverName = config.getString(field);
String historyTopicName = config.getString(KafkaDatabaseHistory.TOPIC);
if (Objects.equals(serverName, historyTopicName)) {
problems.accept(field, serverName, "Must not have the same value as " + KafkaDatabaseHistory.TOPIC.name());
return 1;
}
return 0;
}
}

View File

@ -32,7 +32,7 @@ public ConnectorConfigBuilder mysql(SqlDatabaseController controller, String con
int dbPort = controller.getDatabasePort();
return cb
.put("database.server.name", cb.getDbServerName())
.put("topic.prefix", cb.getDbServerName())
.put("database.server.id", 5400 + random.nextInt(1000))
.put("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.put("task.max", 1)
@ -52,7 +52,7 @@ public ConnectorConfigBuilder postgresql(SqlDatabaseController controller, Strin
int dbPort = controller.getDatabasePort();
return cb
.put("database.server.name", cb.getDbServerName())
.put("topic.prefix", cb.getDbServerName())
.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.put("task.max", 1)
.put("database.hostname", dbHost)
@ -72,7 +72,7 @@ public ConnectorConfigBuilder sqlserver(SqlDatabaseController controller, String
int dbPort = controller.getDatabasePort();
return cb
.put("database.server.name", cb.getDbServerName())
.put("topic.prefix", cb.getDbServerName())
.put("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector")
.put("task.max", 1)
.put("database.hostname", dbHost)
@ -107,7 +107,7 @@ public ConnectorConfigBuilder db2(SqlDatabaseController controller, String conne
int dbPort = controller.getDatabasePort();
return cb
.put("database.server.name", cb.getDbServerName())
.put("topic.prefix", cb.getDbServerName())
.put("connector.class", "io.debezium.connector.db2.Db2Connector")
.put("task.max", 1)
.put("database.hostname", dbHost)
@ -127,7 +127,7 @@ public ConnectorConfigBuilder oracle(SqlDatabaseController controller, String co
int dbPort = controller.getDatabasePort();
return cb
.put("database.server.name", cb.getDbServerName())
.put("topic.prefix", cb.getDbServerName())
.put("connector.class", "io.debezium.connector.oracle.OracleConnector")
.put("task.max", 1)
.put("database.hostname", dbHost)

View File

@ -375,7 +375,7 @@ public void ensureConnectorConfigProperty(String connectorName, String propertyN
public static ConnectorConfiguration getPostgresConnectorConfiguration(PostgreSQLContainer<?> postgresContainer, int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver" + id)
.with("topic.prefix", "dbserver" + id)
.with("slot.name", "debezium_" + id);
if (options != null && options.length > 0) {

View File

@ -158,7 +158,7 @@ public void shouldConvertToCloudEventWithDataAsAvro() throws Exception {
// host, database, user etc. are obtained from the container
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver" + id)
.with("topic.prefix", "dbserver" + id)
.with("slot.name", "debezium_" + id)
.with("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.with("value.converter", "io.debezium.converters.CloudEventsConverter")
@ -231,7 +231,7 @@ private ConnectorConfiguration getConfiguration(int id, String converter, String
// host, database, user etc. are obtained from the container
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver" + id)
.with("topic.prefix", "dbserver" + id)
.with("slot.name", "debezium_" + id)
.with("key.converter", converter)
.with("key.converter.apicurio.registry.url", apicurioUrl)

View File

@ -158,7 +158,7 @@ private List<ConsumerRecord<String, String>> drain(KafkaConsumer<String, String>
private ConnectorConfiguration getConfiguration(int id) {
// host, database, user etc. are obtained from the container
return ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver" + id)
.with("topic.prefix", "dbserver" + id)
.with("slot.name", "debezium_" + id);
}

View File

@ -554,7 +554,7 @@ node('Slave') {
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.kafka.bootstrap.servers": "kafka:9092",
"schema.history.kafka.topic": "schema-changes.inventory"

View File

@ -437,7 +437,7 @@ node('Slave') {
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.kafka.bootstrap.servers": "kafka:9092",
"schema.history.kafka.topic": "schema-changes.inventory"