diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 995ddace4..0adf2c936 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -149,7 +149,7 @@ public static enum SnapshotMode implements EnumeratedValue { /** * Perform a snapshot of only the database schemas (without data) and then begin reading the binlog at the current binlog position. - * This can be used for recovery only if the connector has existing offsets and the schema.history.kafka.topic does not exist (deleted). + * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted). * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, * otherwise some events during the gap may be processed with an incorrect schema and corrupted. */ @@ -736,7 +736,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) { * The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * and in these situations using Kafka is the only way to go. */ - public static final Field SCHEMA_HISTORY = Field.create("schema.history") + public static final Field SCHEMA_HISTORY = Field.create("schema.history.internal") .withDisplayName("Database schema history class") .withType(Type.CLASS) .withWidth(Width.LONG) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java index b452a6371..893df8785 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/relational/history/KafkaSchemaHistoryTest.java @@ -375,14 +375,14 @@ public void shouldValidateMandatoryValues() { final Map issues = config.validate(KafkaSchemaHistory.ALL_FIELDS); Assertions.assertThat(issues.keySet()).isEqualTo(Collect.unmodifiableSet( - "schema.history.name", - "schema.history.connector.class", - "schema.history.kafka.topic", - "schema.history.kafka.bootstrap.servers", - "schema.history.kafka.recovery.poll.interval.ms", - "schema.history.connector.id", - "schema.history.kafka.recovery.attempts", - "schema.history.kafka.query.timeout.ms")); + "schema.history.internal.name", + "schema.history.internal.connector.class", + "schema.history.internal.kafka.topic", + "schema.history.internal.kafka.bootstrap.servers", + "schema.history.internal.kafka.recovery.poll.interval.ms", + "schema.history.internal.connector.id", + "schema.history.internal.kafka.recovery.attempts", + "schema.history.internal.kafka.query.timeout.ms")); } @Test diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 987fb82c6..0a8f407a6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -740,7 +740,7 @@ public enum SnapshotMode implements EnumeratedValue { /** * Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position. - * This can be used for recovery only if the connector has existing offsets and the schema.history.kafka.topic does not exist (deleted). + * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted). * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, * otherwise some events during the gap may be processed with an incorrect schema and corrupted. */ diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java index 3edb64ca6..dbaa2f77e 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java @@ -57,7 +57,7 @@ public class LogMinerQueryBuilderTest { /** * A template that defines the expected SQL output when the configuration specifies - * {@code schema.history.store.only.captured.tables.ddl} is {@code false}. + * {@code schema.history.internal.store.only.captured.tables.ddl} is {@code false}. */ private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " + @@ -78,7 +78,7 @@ public class LogMinerQueryBuilderTest { /** * A template that defines the expected SQL output when the configuration specifies - * {@code schema.history.store.only.captured.tables.ddl} is {@code true}. + * {@code schema.history.internal.store.only.captured.tables.ddl} is {@code true}. */ private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " + diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java index a5a903917..60590ae59 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseConnectorConfig.java @@ -41,7 +41,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati * The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * and in these situations using Kafka is the only way to go. */ - public static final Field SCHEMA_HISTORY = Field.create("schema.history") + public static final Field SCHEMA_HISTORY = Field.create("schema.history.internal") .withDisplayName("Database schema history class") .withType(Type.CLASS) .withWidth(Width.LONG) diff --git a/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java index 04d6d8096..1feff0488 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/SchemaHistory.java @@ -33,7 +33,7 @@ */ public interface SchemaHistory { - public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history."; + public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal."; public static final Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name") .withDisplayName("Logical name for the database schema history") diff --git a/debezium-e2e-benchmark/py/tpc-run-test.py b/debezium-e2e-benchmark/py/tpc-run-test.py index bea1a14e9..127fd3834 100755 --- a/debezium-e2e-benchmark/py/tpc-run-test.py +++ b/debezium-e2e-benchmark/py/tpc-run-test.py @@ -130,7 +130,7 @@ def main(argv): config['name'] = 'tpc-connector' print(config['name']) - config['config']['schema.history.kafka.topic'] = 'tpc-test' + config['config']['schema.history.internal.kafka.topic'] = 'tpc-test' databasetype = config['config']['connector.class'] connectiontype = config['config']['connector.class'].split('.')[3] @@ -176,7 +176,7 @@ def main(argv): databaseservername = config['config']['topic.prefix'] topicname = databaseservername + '.' + table - historybootstrapserver = config['config'].get('schema.history.kafka.bootstrap.servers') + historybootstrapserver = config['config'].get('schema.history.internal.kafka.bootstrap.servers') if historybootstrapserver != None: bootstrapserver = historybootstrapserver.split(",") @@ -196,13 +196,13 @@ def main(argv): if historybootstrapserver != None: try: kafkaadmin.delete_topics( - [config['config']['schema.history.kafka.topic']], 30) + [config['config']['schema.history.internal.kafka.topic']], 30) except: - print(config['config']['schema.history.kafka.topic'] + + print(config['config']['schema.history.internal.kafka.topic'] + ' TOPIC not exists') else: print(config['config'] - ['schema.history.kafka.topic'] + ' TOPIC deleted') + ['schema.history.internal.kafka.topic'] + ' TOPIC deleted') # start tpc connector print('start tpc connector') diff --git a/debezium-embedded/README.md b/debezium-embedded/README.md index 3bf414a98..e89a39002 100644 --- a/debezium-embedded/README.md +++ b/debezium-embedded/README.md @@ -47,8 +47,8 @@ Here's an example of code that configures and runs an embedded MySQL connector: .with("database.password", "mysqlpw") .with("server.id", 85744) .with("server.name", "my-app-connector") - .with("schema.history", "io.debezium.storage.file.history.FileSchemaHistory") - .with("schema.history.file.filename", "/path/to/storage/schemahistory.dat") + .with("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory") + .with("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat") .build()) // Create the engine with this configuration ... @@ -88,8 +88,8 @@ The next few lines define the fields that are specific to the connector, which i .with("database.password", "mysqlpw") .with("server.id", 85744) .with("server.name", "products") - .with("schema.history", "io.debezium.storage.file.history.FileSchemaHistory") - .with("schema.history.file.filename", "/path/to/storage/schemahistory.dat") + .with("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory") + .with("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat") .build()) Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. Note that for MySQL the username and password should correspond to a MySQL database user that has been granted the [`REPLICATION SLAVE` privilege](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-repuser.html), allowing the database to read the server's binlog that is normally used for MySQL replication. diff --git a/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java index 9e845861b..e11813fb1 100644 --- a/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java +++ b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisSchemaHistoryTestProfile.java @@ -28,7 +28,7 @@ public List testResources() { public Map getConfigOverrides() { Map config = new HashMap(); config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - config.put("debezium.source.schema.history", "io.debezium.server.redis.RedisSchemaHistory"); + config.put("debezium.source.schema.history.internal", "io.debezium.server.redis.RedisSchemaHistory"); config.put("debezium.source.database.server.id", "12345"); return config; } diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java index 6fe76cfd9..96b12e242 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java @@ -40,8 +40,8 @@ public ConnectorConfigBuilder mysql(SqlDatabaseController controller, String con .put("database.port", dbPort) .put("database.user", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME) .put("database.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD) - .put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) - .put("schema.history.kafka.topic", "schema-changes.inventory") + .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress()) + .put("schema.history.internal.kafka.topic", "schema-changes.inventory") .addOperationRouterForTable("u", "customers"); } @@ -81,8 +81,8 @@ public ConnectorConfigBuilder sqlserver(SqlDatabaseController controller, String .put("database.password", ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD) .put("database.names", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAMES) .put("database.encrypt", false) - .put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) - .put("schema.history.kafka.topic", "schema-changes.inventory") + .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress()) + .put("schema.history.internal.kafka.topic", "schema-changes.inventory") .addOperationRouterForTable("u", "customers"); } @@ -116,8 +116,8 @@ public ConnectorConfigBuilder db2(SqlDatabaseController controller, String conne .put("database.password", ConfigProperties.DATABASE_DB2_DBZ_PASSWORD) .put("database.dbname", ConfigProperties.DATABASE_DB2_DBZ_DBNAME) .put("database.cdcschema", ConfigProperties.DATABASE_DB2_CDC_SCHEMA) - .put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) - .put("schema.history.kafka.topic", "schema-changes.inventory") + .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress()) + .put("schema.history.internal.kafka.topic", "schema-changes.inventory") .addOperationRouterForTable("u", "CUSTOMERS"); } @@ -139,8 +139,8 @@ public ConnectorConfigBuilder oracle(SqlDatabaseController controller, String co .put("schema.include.list", "DEBEZIUM") .put("table.include.list", "DEBEZIUM.CUSTOMERS") .put("database.pdb.name", ConfigProperties.DATABASE_ORACLE_PDBNAME) - .put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) - .put("schema.history.kafka.topic", "schema-changes.oracle") + .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress()) + .put("schema.history.internal.kafka.topic", "schema-changes.oracle") .put("log.mining.strategy", "online_catalog") .addOperationRouterForTable("u", "CUSTOMERS"); } diff --git a/jenkins-jobs/pipelines/release-pipeline.groovy b/jenkins-jobs/pipelines/release-pipeline.groovy index d0c7761fa..4c8c68267 100644 --- a/jenkins-jobs/pipelines/release-pipeline.groovy +++ b/jenkins-jobs/pipelines/release-pipeline.groovy @@ -556,8 +556,8 @@ node('Slave') { "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", - "schema.history.kafka.bootstrap.servers": "kafka:9092", - "schema.history.kafka.topic": "schema-changes.inventory" + "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", + "schema.history.internal.kafka.topic": "schema-changes.inventory" } } ' diff --git a/jenkins-jobs/scripts/release-pipeline.groovy b/jenkins-jobs/scripts/release-pipeline.groovy index 0e43b8a85..8f4770ff6 100644 --- a/jenkins-jobs/scripts/release-pipeline.groovy +++ b/jenkins-jobs/scripts/release-pipeline.groovy @@ -439,8 +439,8 @@ node('Slave') { "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", - "schema.history.kafka.bootstrap.servers": "kafka:9092", - "schema.history.kafka.topic": "schema-changes.inventory" + "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", + "schema.history.internal.kafka.topic": "schema-changes.inventory" } } '