DBZ-5043 Rename schema.history to schema.history.internal

This commit is contained in:
Vojtech Juranek 2022-09-06 16:39:07 +02:00 committed by Jiri Pechanec
parent 0235d4aad5
commit 402df78381
12 changed files with 37 additions and 37 deletions

View File

@ -149,7 +149,7 @@ public static enum SnapshotMode implements EnumeratedValue {
/** /**
* Perform a snapshot of only the database schemas (without data) and then begin reading the binlog at the current binlog position. * Perform a snapshot of only the database schemas (without data) and then begin reading the binlog at the current binlog position.
* This can be used for recovery only if the connector has existing offsets and the schema.history.kafka.topic does not exist (deleted). * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted. * otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/ */
@ -736,7 +736,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
* The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go. * and in these situations using Kafka is the only way to go.
*/ */
public static final Field SCHEMA_HISTORY = Field.create("schema.history") public static final Field SCHEMA_HISTORY = Field.create("schema.history.internal")
.withDisplayName("Database schema history class") .withDisplayName("Database schema history class")
.withType(Type.CLASS) .withType(Type.CLASS)
.withWidth(Width.LONG) .withWidth(Width.LONG)

View File

@ -375,14 +375,14 @@ public void shouldValidateMandatoryValues() {
final Map<String, ConfigValue> issues = config.validate(KafkaSchemaHistory.ALL_FIELDS); final Map<String, ConfigValue> issues = config.validate(KafkaSchemaHistory.ALL_FIELDS);
Assertions.assertThat(issues.keySet()).isEqualTo(Collect.unmodifiableSet( Assertions.assertThat(issues.keySet()).isEqualTo(Collect.unmodifiableSet(
"schema.history.name", "schema.history.internal.name",
"schema.history.connector.class", "schema.history.internal.connector.class",
"schema.history.kafka.topic", "schema.history.internal.kafka.topic",
"schema.history.kafka.bootstrap.servers", "schema.history.internal.kafka.bootstrap.servers",
"schema.history.kafka.recovery.poll.interval.ms", "schema.history.internal.kafka.recovery.poll.interval.ms",
"schema.history.connector.id", "schema.history.internal.connector.id",
"schema.history.kafka.recovery.attempts", "schema.history.internal.kafka.recovery.attempts",
"schema.history.kafka.query.timeout.ms")); "schema.history.internal.kafka.query.timeout.ms"));
} }
@Test @Test

View File

@ -740,7 +740,7 @@ public enum SnapshotMode implements EnumeratedValue {
/** /**
* Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position. * Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position.
* This can be used for recovery only if the connector has existing offsets and the schema.history.kafka.topic does not exist (deleted). * This can be used for recovery only if the connector has existing offsets and the schema.history.internal.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped, * This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted. * otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/ */

View File

@ -57,7 +57,7 @@ public class LogMinerQueryBuilderTest {
/** /**
* A template that defines the expected SQL output when the configuration specifies * A template that defines the expected SQL output when the configuration specifies
* {@code schema.history.store.only.captured.tables.ddl} is {@code false}. * {@code schema.history.internal.store.only.captured.tables.ddl} is {@code false}.
*/ */
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " +
@ -78,7 +78,7 @@ public class LogMinerQueryBuilderTest {
/** /**
* A template that defines the expected SQL output when the configuration specifies * A template that defines the expected SQL output when the configuration specifies
* {@code schema.history.store.only.captured.tables.ddl} is {@code true}. * {@code schema.history.internal.store.only.captured.tables.ddl} is {@code true}.
*/ */
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " +

View File

@ -41,7 +41,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
* The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go. * and in these situations using Kafka is the only way to go.
*/ */
public static final Field SCHEMA_HISTORY = Field.create("schema.history") public static final Field SCHEMA_HISTORY = Field.create("schema.history.internal")
.withDisplayName("Database schema history class") .withDisplayName("Database schema history class")
.withType(Type.CLASS) .withType(Type.CLASS)
.withWidth(Width.LONG) .withWidth(Width.LONG)

View File

@ -33,7 +33,7 @@
*/ */
public interface SchemaHistory { public interface SchemaHistory {
public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history."; public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal.";
public static final Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name") public static final Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name")
.withDisplayName("Logical name for the database schema history") .withDisplayName("Logical name for the database schema history")

View File

@ -130,7 +130,7 @@ def main(argv):
config['name'] = 'tpc-connector' config['name'] = 'tpc-connector'
print(config['name']) print(config['name'])
config['config']['schema.history.kafka.topic'] = 'tpc-test' config['config']['schema.history.internal.kafka.topic'] = 'tpc-test'
databasetype = config['config']['connector.class'] databasetype = config['config']['connector.class']
connectiontype = config['config']['connector.class'].split('.')[3] connectiontype = config['config']['connector.class'].split('.')[3]
@ -176,7 +176,7 @@ def main(argv):
databaseservername = config['config']['topic.prefix'] databaseservername = config['config']['topic.prefix']
topicname = databaseservername + '.' + table topicname = databaseservername + '.' + table
historybootstrapserver = config['config'].get('schema.history.kafka.bootstrap.servers') historybootstrapserver = config['config'].get('schema.history.internal.kafka.bootstrap.servers')
if historybootstrapserver != None: if historybootstrapserver != None:
bootstrapserver = historybootstrapserver.split(",") bootstrapserver = historybootstrapserver.split(",")
@ -196,13 +196,13 @@ def main(argv):
if historybootstrapserver != None: if historybootstrapserver != None:
try: try:
kafkaadmin.delete_topics( kafkaadmin.delete_topics(
[config['config']['schema.history.kafka.topic']], 30) [config['config']['schema.history.internal.kafka.topic']], 30)
except: except:
print(config['config']['schema.history.kafka.topic'] + print(config['config']['schema.history.internal.kafka.topic'] +
' TOPIC not exists') ' TOPIC not exists')
else: else:
print(config['config'] print(config['config']
['schema.history.kafka.topic'] + ' TOPIC deleted') ['schema.history.internal.kafka.topic'] + ' TOPIC deleted')
# start tpc connector # start tpc connector
print('start tpc connector') print('start tpc connector')

View File

@ -47,8 +47,8 @@ Here's an example of code that configures and runs an embedded MySQL connector:
.with("database.password", "mysqlpw") .with("database.password", "mysqlpw")
.with("server.id", 85744) .with("server.id", 85744)
.with("server.name", "my-app-connector") .with("server.name", "my-app-connector")
.with("schema.history", "io.debezium.storage.file.history.FileSchemaHistory") .with("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")
.with("schema.history.file.filename", "/path/to/storage/schemahistory.dat") .with("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat")
.build()) .build())
// Create the engine with this configuration ... // Create the engine with this configuration ...
@ -88,8 +88,8 @@ The next few lines define the fields that are specific to the connector, which i
.with("database.password", "mysqlpw") .with("database.password", "mysqlpw")
.with("server.id", 85744) .with("server.id", 85744)
.with("server.name", "products") .with("server.name", "products")
.with("schema.history", "io.debezium.storage.file.history.FileSchemaHistory") .with("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")
.with("schema.history.file.filename", "/path/to/storage/schemahistory.dat") .with("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat")
.build()) .build())
Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. Note that for MySQL the username and password should correspond to a MySQL database user that has been granted the [`REPLICATION SLAVE` privilege](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-repuser.html), allowing the database to read the server's binlog that is normally used for MySQL replication. Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. Note that for MySQL the username and password should correspond to a MySQL database user that has been granted the [`REPLICATION SLAVE` privilege](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-repuser.html), allowing the database to read the server's binlog that is normally used for MySQL replication.

View File

@ -28,7 +28,7 @@ public List<TestResourceEntry> testResources() {
public Map<String, String> getConfigOverrides() { public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<String, String>(); Map<String, String> config = new HashMap<String, String>();
config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
config.put("debezium.source.schema.history", "io.debezium.server.redis.RedisSchemaHistory"); config.put("debezium.source.schema.history.internal", "io.debezium.server.redis.RedisSchemaHistory");
config.put("debezium.source.database.server.id", "12345"); config.put("debezium.source.database.server.id", "12345");
return config; return config;
} }

View File

@ -40,8 +40,8 @@ public ConnectorConfigBuilder mysql(SqlDatabaseController controller, String con
.put("database.port", dbPort) .put("database.port", dbPort)
.put("database.user", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME) .put("database.user", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME)
.put("database.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD) .put("database.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD)
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.inventory") .put("schema.history.internal.kafka.topic", "schema-changes.inventory")
.addOperationRouterForTable("u", "customers"); .addOperationRouterForTable("u", "customers");
} }
@ -81,8 +81,8 @@ public ConnectorConfigBuilder sqlserver(SqlDatabaseController controller, String
.put("database.password", ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD) .put("database.password", ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD)
.put("database.names", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAMES) .put("database.names", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAMES)
.put("database.encrypt", false) .put("database.encrypt", false)
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.inventory") .put("schema.history.internal.kafka.topic", "schema-changes.inventory")
.addOperationRouterForTable("u", "customers"); .addOperationRouterForTable("u", "customers");
} }
@ -116,8 +116,8 @@ public ConnectorConfigBuilder db2(SqlDatabaseController controller, String conne
.put("database.password", ConfigProperties.DATABASE_DB2_DBZ_PASSWORD) .put("database.password", ConfigProperties.DATABASE_DB2_DBZ_PASSWORD)
.put("database.dbname", ConfigProperties.DATABASE_DB2_DBZ_DBNAME) .put("database.dbname", ConfigProperties.DATABASE_DB2_DBZ_DBNAME)
.put("database.cdcschema", ConfigProperties.DATABASE_DB2_CDC_SCHEMA) .put("database.cdcschema", ConfigProperties.DATABASE_DB2_CDC_SCHEMA)
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.inventory") .put("schema.history.internal.kafka.topic", "schema-changes.inventory")
.addOperationRouterForTable("u", "CUSTOMERS"); .addOperationRouterForTable("u", "CUSTOMERS");
} }
@ -139,8 +139,8 @@ public ConnectorConfigBuilder oracle(SqlDatabaseController controller, String co
.put("schema.include.list", "DEBEZIUM") .put("schema.include.list", "DEBEZIUM")
.put("table.include.list", "DEBEZIUM.CUSTOMERS") .put("table.include.list", "DEBEZIUM.CUSTOMERS")
.put("database.pdb.name", ConfigProperties.DATABASE_ORACLE_PDBNAME) .put("database.pdb.name", ConfigProperties.DATABASE_ORACLE_PDBNAME)
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress()) .put("schema.history.internal.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.oracle") .put("schema.history.internal.kafka.topic", "schema-changes.oracle")
.put("log.mining.strategy", "online_catalog") .put("log.mining.strategy", "online_catalog")
.addOperationRouterForTable("u", "CUSTOMERS"); .addOperationRouterForTable("u", "CUSTOMERS");
} }

View File

@ -556,8 +556,8 @@ node('Slave') {
"database.server.id": "184054", "database.server.id": "184054",
"topic.prefix": "dbserver1", "topic.prefix": "dbserver1",
"database.include.list": "inventory", "database.include.list": "inventory",
"schema.history.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.kafka.topic": "schema-changes.inventory" "schema.history.internal.kafka.topic": "schema-changes.inventory"
} }
} }
' '

View File

@ -439,8 +439,8 @@ node('Slave') {
"database.server.id": "184054", "database.server.id": "184054",
"topic.prefix": "dbserver1", "topic.prefix": "dbserver1",
"database.include.list": "inventory", "database.include.list": "inventory",
"schema.history.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.kafka.topic": "schema-changes.inventory" "schema.history.internal.kafka.topic": "schema-changes.inventory"
} }
} }
' '