DBZ-5043 Change namespace to database.history to schema.history

This commit is contained in:
Vojtech Juranek 2022-08-17 14:52:27 +02:00 committed by Jiri Pechanec
parent 48d37857b6
commit 25aa6c4acc
12 changed files with 37 additions and 37 deletions

View File

@ -149,7 +149,7 @@ public static enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot of only the database schemas (without data) and then begin reading the binlog at the current binlog position.
* This can be used for recovery only if the connector has existing offsets and the database.history.kafka.topic does not exist (deleted).
* This can be used for recovery only if the connector has existing offsets and the schema.history.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/
@ -739,7 +739,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go.
*/
public static final Field DATABASE_HISTORY = Field.create("database.history")
public static final Field DATABASE_HISTORY = Field.create("schema.history")
.withDisplayName("Database history class")
.withType(Type.CLASS)
.withWidth(Width.LONG)

View File

@ -375,14 +375,14 @@ public void shouldValidateMandatoryValues() {
final Map<String, ConfigValue> issues = config.validate(KafkaDatabaseHistory.ALL_FIELDS);
Assertions.assertThat(issues.keySet()).isEqualTo(Collect.unmodifiableSet(
"database.history.name",
"database.history.connector.class",
"database.history.kafka.topic",
"database.history.kafka.bootstrap.servers",
"database.history.kafka.recovery.poll.interval.ms",
"database.history.connector.id",
"database.history.kafka.recovery.attempts",
"database.history.kafka.query.timeout.ms"));
"schema.history.name",
"schema.history.connector.class",
"schema.history.kafka.topic",
"schema.history.kafka.bootstrap.servers",
"schema.history.kafka.recovery.poll.interval.ms",
"schema.history.connector.id",
"schema.history.kafka.recovery.attempts",
"schema.history.kafka.query.timeout.ms"));
}
@Test

View File

@ -745,7 +745,7 @@ public enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot of only the database schemas (without data) and then begin reading the redo log at the current redo log position.
* This can be used for recovery only if the connector has existing offsets and the database.history.kafka.topic does not exist (deleted).
* This can be used for recovery only if the connector has existing offsets and the schema.history.kafka.topic does not exist (deleted).
* This recovery option should be used with care as it assumes there have been no schema changes since the connector last stopped,
* otherwise some events during the gap may be processed with an incorrect schema and corrupted.
*/

View File

@ -57,7 +57,7 @@ public class LogMinerQueryBuilderTest {
/**
* A template that defines the expected SQL output when the configuration specifies
* {@code database.history.store.only.captured.tables.ddl} is {@code false}.
* {@code schema.history.store.only.captured.tables.ddl} is {@code false}.
*/
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " +
@ -78,7 +78,7 @@ public class LogMinerQueryBuilderTest {
/**
* A template that defines the expected SQL output when the configuration specifies
* {@code database.history.store.only.captured.tables.ddl} is {@code true}.
* {@code schema.history.store.only.captured.tables.ddl} is {@code true}.
*/
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " +

View File

@ -41,7 +41,7 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go.
*/
public static final Field DATABASE_HISTORY = Field.create("database.history")
public static final Field DATABASE_HISTORY = Field.create("schema.history")
.withDisplayName("Database history class")
.withType(Type.CLASS)
.withWidth(Width.LONG)

View File

@ -33,7 +33,7 @@
*/
public interface DatabaseHistory {
public static final String CONFIGURATION_FIELD_PREFIX_STRING = "database.history.";
public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.";
public static final Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name")
.withDisplayName("Logical name for the database history")

View File

@ -130,7 +130,7 @@ def main(argv):
config['name'] = 'tpc-connector'
print(config['name'])
config['config']['database.history.kafka.topic'] = 'tpc-test'
config['config']['schema.history.kafka.topic'] = 'tpc-test'
databasetype = config['config']['connector.class']
connectiontype = config['config']['connector.class'].split('.')[3]
@ -176,7 +176,7 @@ def main(argv):
databaseservername = config['config']['database.server.name']
topicname = databaseservername + '.' + table
historybootstrapserver = config['config'].get('database.history.kafka.bootstrap.servers')
historybootstrapserver = config['config'].get('schema.history.kafka.bootstrap.servers')
if historybootstrapserver != None:
bootstrapserver = historybootstrapserver.split(",")
@ -196,13 +196,13 @@ def main(argv):
if historybootstrapserver != None:
try:
kafkaadmin.delete_topics(
[config['config']['database.history.kafka.topic']], 30)
[config['config']['schema.history.kafka.topic']], 30)
except:
print(config['config']['database.history.kafka.topic'] +
print(config['config']['schema.history.kafka.topic'] +
' TOPIC not exists')
else:
print(config['config']
['database.history.kafka.topic'] + ' TOPIC deleted')
['schema.history.kafka.topic'] + ' TOPIC deleted')
# start tpc connector
print('start tpc connector')

View File

@ -47,8 +47,8 @@ Here's an example of code that configures and runs an embedded MySQL connector:
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "my-app-connector")
.with("database.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.with("schema.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("schema.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
// Create the engine with this configuration ...
@ -88,8 +88,8 @@ The next few lines define the fields that are specific to the connector, which i
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("server.name", "products")
.with("database.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("database.history.file.filename", "/path/to/storage/dbhistory.dat")
.with("schema.history", "io.debezium.storage.file.history.FileDatabaseHistory")
.with("schema.history.file.filename", "/path/to/storage/dbhistory.dat")
.build())
Here, we set the name of the host machine and port number where the MySQL database server is running, and we define the username and password that will be used to connect to the MySQL database. Note that for MySQL the username and password should correspond to a MySQL database user that has been granted the [`REPLICATION SLAVE` privilege](http://dev.mysql.com/doc/refman/5.7/en/replication-howto-repuser.html), allowing the database to read the server's binlog that is normally used for MySQL replication.

View File

@ -28,7 +28,7 @@ public List<TestResourceEntry> testResources() {
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<String, String>();
config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
config.put("debezium.source.database.history", "io.debezium.server.redis.RedisDatabaseHistory");
config.put("debezium.source.schema.history", "io.debezium.server.redis.RedisDatabaseHistory");
config.put("debezium.source.database.server.id", "12345");
return config;
}

View File

@ -40,8 +40,8 @@ public ConnectorConfigBuilder mysql(SqlDatabaseController controller, String con
.put("database.port", dbPort)
.put("database.user", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME)
.put("database.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD)
.put("database.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("database.history.kafka.topic", "schema-changes.inventory")
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.inventory")
.addOperationRouterForTable("u", "customers");
}
@ -81,8 +81,8 @@ public ConnectorConfigBuilder sqlserver(SqlDatabaseController controller, String
.put("database.password", ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD)
.put("database.names", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAMES)
.put("database.encrypt", false)
.put("database.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("database.history.kafka.topic", "schema-changes.inventory")
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.inventory")
.addOperationRouterForTable("u", "customers");
}
@ -116,8 +116,8 @@ public ConnectorConfigBuilder db2(SqlDatabaseController controller, String conne
.put("database.password", ConfigProperties.DATABASE_DB2_DBZ_PASSWORD)
.put("database.dbname", ConfigProperties.DATABASE_DB2_DBZ_DBNAME)
.put("database.cdcschema", ConfigProperties.DATABASE_DB2_CDC_SCHEMA)
.put("database.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("database.history.kafka.topic", "schema-changes.inventory")
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.inventory")
.addOperationRouterForTable("u", "CUSTOMERS");
}
@ -139,8 +139,8 @@ public ConnectorConfigBuilder oracle(SqlDatabaseController controller, String co
.put("schema.include.list", "DEBEZIUM")
.put("table.include.list", "DEBEZIUM.CUSTOMERS")
.put("database.pdb.name", ConfigProperties.DATABASE_ORACLE_PDBNAME)
.put("database.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("database.history.kafka.topic", "schema-changes.oracle")
.put("schema.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("schema.history.kafka.topic", "schema-changes.oracle")
.put("log.mining.strategy", "online_catalog")
.addOperationRouterForTable("u", "CUSTOMERS");
}

View File

@ -556,8 +556,8 @@ node('Slave') {
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
"schema.history.kafka.bootstrap.servers": "kafka:9092",
"schema.history.kafka.topic": "schema-changes.inventory"
}
}
'

View File

@ -439,8 +439,8 @@ node('Slave') {
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
"schema.history.kafka.bootstrap.servers": "kafka:9092",
"schema.history.kafka.topic": "schema-changes.inventory"
}
}
'