DBZ-5043 Replace 'database history' with 'database schema history'

Reflect renaming database history to schema history in comments,
configuration descriptions and log and error messages.
This commit is contained in:
Vojtech Juranek 2022-09-06 13:51:04 +02:00 committed by Jiri Pechanec
parent 5bffd7839a
commit b6c31f3da5
20 changed files with 63 additions and 60 deletions

View File

@ -211,7 +211,7 @@ public boolean shouldStream() {
} }
/** /**
* Whether the schema can be recovered if database history is corrupted. * Whether the schema can be recovered if database schema history is corrupted.
*/ */
public boolean shouldSnapshotOnSchemaError() { public boolean shouldSnapshotOnSchemaError() {
return shouldSnapshotOnSchemaError; return shouldSnapshotOnSchemaError;
@ -733,11 +733,11 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.withValidation(Field::isNonNegativeInteger); .withValidation(Field::isNonNegativeInteger);
/** /**
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go. * and in these situations using Kafka is the only way to go.
*/ */
public static final Field SCHEMA_HISTORY = Field.create("schema.history") public static final Field SCHEMA_HISTORY = Field.create("schema.history")
.withDisplayName("Database history class") .withDisplayName("Database schema history class")
.withType(Type.CLASS) .withType(Type.CLASS)
.withWidth(Width.LONG) .withWidth(Width.LONG)
.withImportance(Importance.LOW) .withImportance(Importance.LOW)

View File

@ -321,12 +321,12 @@ private boolean validateAndLoadSchemaHistory(MySqlConnectorConfig config, MySqlP
// would like to also verify binlog position exists, but it defaults to 0 which is technically valid // would like to also verify binlog position exists, but it defaults to 0 which is technically valid
throw new DebeziumException("Could not find existing binlog information while attempting schema only recovery snapshot"); throw new DebeziumException("Could not find existing binlog information while attempting schema only recovery snapshot");
} }
LOGGER.info("Connector started for the first time, database history recovery will not be executed"); LOGGER.info("Connector started for the first time, database schema history recovery will not be executed");
schema.initializeStorage(); schema.initializeStorage();
return false; return false;
} }
if (!schema.historyExists()) { if (!schema.historyExists()) {
LOGGER.warn("Database history was not found but was expected"); LOGGER.warn("Database schema history was not found but was expected");
if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) { if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
// But check to see if the server still has those binlog coordinates ... // But check to see if the server still has those binlog coordinates ...
if (!isBinlogAvailable(config, offset)) { if (!isBinlogAvailable(config, offset)) {

View File

@ -283,7 +283,7 @@ else if (event instanceof SetVariableEvent) {
} }
} }
else { else {
LOGGER.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements); LOGGER.debug("Changes for DDL '{}' were filtered and not recorded in database schema history", ddlStatements);
} }
return schemaChangeEvents; return schemaChangeEvents;
} }
@ -340,7 +340,7 @@ protected DdlParser getDdlParser() {
} }
/** /**
* Return true if the database history entity exists * Return true if the database schema history entity exists
*/ */
public boolean historyExists() { public boolean historyExists() {
return schemaHistory.exists(); return schemaHistory.exists();

View File

@ -684,7 +684,7 @@ private void informAboutUnknownTableIfRequired(MySqlPartition partition, MySqlOf
if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) { if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
LOGGER.error( LOGGER.error(
"Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}" "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database schema history topic. Take a new snapshot in this case.{}"
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
event, offsetContext.getOffset(), tableId, System.lineSeparator(), eventHeader.getPosition(), event, offsetContext.getOffset(), tableId, System.lineSeparator(), eventHeader.getPosition(),
eventHeader.getNextPosition(), offsetContext.getSource().binlogFilename()); eventHeader.getNextPosition(), offsetContext.getSource().binlogFilename());
@ -693,7 +693,7 @@ private void informAboutUnknownTableIfRequired(MySqlPartition partition, MySqlOf
} }
else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) { else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
LOGGER.warn( LOGGER.warn(
"Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}" "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database schema history topic. Take a new snapshot in this case.{}"
+ "The event will be ignored.{}" + "The event will be ignored.{}"
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(), event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),
@ -701,7 +701,7 @@ else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WA
} }
else { else {
LOGGER.debug( LOGGER.debug(
"Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case.{}" "Encountered change event '{}' at offset {} for table {} whose schema isn't known to this connector. One possible cause is an incomplete database schema history topic. Take a new snapshot in this case.{}"
+ "The event will be ignored.{}" + "The event will be ignored.{}"
+ "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}", + "Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(), event, offsetContext.getOffset(), tableId, System.lineSeparator(), System.lineSeparator(),

View File

@ -23,7 +23,7 @@
/** /**
* *
* The test to verify whether DDL is stored correctly in database history. * The test to verify whether DDL is stored correctly in database schema history.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */

View File

@ -779,7 +779,7 @@ public boolean shouldStream() {
} }
/** /**
* Whether the schema can be recovered if database history is corrupted. * Whether the schema can be recovered if database schema history is corrupted.
*/ */
public boolean shouldSnapshotOnSchemaError() { public boolean shouldSnapshotOnSchemaError() {
return shouldSnapshotOnSchemaError; return shouldSnapshotOnSchemaError;

View File

@ -183,12 +183,12 @@ private void validateAndLoadSchemaHistory(OracleConnectorConfig config, OraclePa
// would like to also verify redo log position exists, but it defaults to 0 which is technically valid // would like to also verify redo log position exists, but it defaults to 0 which is technically valid
throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot"); throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot");
} }
LOGGER.info("Connector started for the first time, database history recovery will not be executed"); LOGGER.info("Connector started for the first time, database schema history recovery will not be executed");
schema.initializeStorage(); schema.initializeStorage();
return; return;
} }
if (!schema.historyExists()) { if (!schema.historyExists()) {
LOGGER.warn("Database history was not found but was expected"); LOGGER.warn("Database schema history was not found but was expected");
if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) { if (config.getSnapshotMode().shouldSnapshotOnSchemaError()) {
LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. " + LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. " +
"Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.", "Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.",

View File

@ -119,7 +119,7 @@ public boolean isStorageInitializationExecuted() {
} }
/** /**
* Return true if the database history entity exists * Return true if the database schema history entity exists
*/ */
public boolean historyExists() { public boolean historyExists() {
return schemaHistory.exists(); return schemaHistory.exists();

View File

@ -444,7 +444,7 @@ private void shouldHandleDefaultValuesCommon(List<ColumnDefinition> columnDefini
} }
/** /**
* Restarts the connector and verifies when the database history topic is loaded that we can parse * Restarts the connector and verifies when the database schema history topic is loaded that we can parse
* all the loaded history statements without failures. * all the loaded history statements without failures.
*/ */
private void TestDefaultValuesByRestartAndLoadingHistoryTopic() throws Exception { private void TestDefaultValuesByRestartAndLoadingHistoryTopic() throws Exception {

View File

@ -29,7 +29,7 @@
import oracle.jdbc.OracleTypes; import oracle.jdbc.OracleTypes;
/** /**
* Unit tests for Oracle's database history. * Unit tests for Oracle's database schema history.
* *
* @author Chris Cranford * @author Chris Cranford
*/ */

View File

@ -2074,7 +2074,7 @@ public void shouldDetectPurgedHistory() throws Exception {
start(SqlServerConnector.class, config); start(SqlServerConnector.class, config);
assertConnectorNotRunning(); assertConnectorNotRunning();
assertThat(logInterceptor.containsStacktraceElement( assertThat(logInterceptor.containsStacktraceElement(
"The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.")) "The db history topic or its content is fully or partially missing. Please check database schema history topic configuration and re-execute the snapshot."))
.isTrue(); .isTrue();
} }

View File

@ -38,11 +38,11 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
private final boolean multiPartitionMode; private final boolean multiPartitionMode;
/** /**
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go. * and in these situations using Kafka is the only way to go.
*/ */
public static final Field SCHEMA_HISTORY = Field.create("schema.history") public static final Field SCHEMA_HISTORY = Field.create("schema.history")
.withDisplayName("Database history class") .withDisplayName("Database schema history class")
.withType(Type.CLASS) .withType(Type.CLASS)
.withWidth(Width.LONG) .withWidth(Width.LONG)
.withImportance(Importance.LOW) .withImportance(Importance.LOW)
@ -85,14 +85,14 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
} }
/** /**
* Returns a configured (but not yet started) instance of the database history. * Returns a configured (but not yet started) instance of the database schema history.
*/ */
public SchemaHistory getSchemaHistory() { public SchemaHistory getSchemaHistory() {
Configuration config = getConfig(); Configuration config = getConfig();
SchemaHistory schemaHistory = config.getInstance(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY, SchemaHistory.class); SchemaHistory schemaHistory = config.getInstance(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY, SchemaHistory.class);
if (schemaHistory == null) { if (schemaHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " + throw new ConnectException("Unable to instantiate the database schema history class " +
config.getString(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY)); config.getString(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY));
} }

View File

@ -55,7 +55,7 @@ public void recover(Offsets<?, ?> offsets) {
} }
if (!schemaHistory.exists()) { if (!schemaHistory.exists()) {
String msg = "The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot."; String msg = "The db history topic or its content is fully or partially missing. Please check database schema history topic configuration and re-execute the snapshot.";
throw new DebeziumException(msg); throw new DebeziumException(msg);
} }

View File

@ -388,7 +388,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
.withDescription("Whether the connector should publish changes in the database schema to a Kafka topic with " .withDescription("Whether the connector should publish changes in the database schema to a Kafka topic with "
+ "the same name as the database server ID. Each schema change will be recorded using a key that " + "the same name as the database server ID. Each schema change will be recorded using a key that "
+ "contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s). " + "contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s). "
+ "The default is 'true'. This is independent of how the connector internally records database history.") + "The default is 'true'. This is independent of how the connector internally records database schema history.")
.withDefault(true); .withDefault(true);
public static final Field INCLUDE_SCHEMA_COMMENTS = Field.create("include.schema.comments") public static final Field INCLUDE_SCHEMA_COMMENTS = Field.create("include.schema.comments")

View File

@ -177,7 +177,7 @@ else if (ddl != null && ddlParser != null) {
} }
catch (final ParsingException | MultipleParsingExceptions e) { catch (final ParsingException | MultipleParsingExceptions e) {
if (skipUnparseableDDL) { if (skipUnparseableDDL) {
logger.warn("Ignoring unparseable statements '{}' stored in database history: {}", ddl, e); logger.warn("Ignoring unparseable statements '{}' stored in database schema history: {}", ddl, e);
} }
else { else {
throw e; throw e;

View File

@ -36,11 +36,11 @@ public interface SchemaHistory {
public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history."; public static final String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.";
public static final Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name") public static final Field NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name")
.withDisplayName("Logical name for the database history") .withDisplayName("Logical name for the database schema history")
.withType(Type.STRING) .withType(Type.STRING)
.withWidth(Width.MEDIUM) .withWidth(Width.MEDIUM)
.withImportance(Importance.LOW) .withImportance(Importance.LOW)
.withDescription("The name used for the database history, perhaps differently by each implementation.") .withDescription("The name used for the database schema history, perhaps differently by each implementation.")
.withValidation(Field::isOptional); .withValidation(Field::isOptional);
public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "skip.unparseable.ddl") public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "skip.unparseable.ddl")
@ -58,7 +58,7 @@ public interface SchemaHistory {
.withType(Type.BOOLEAN) .withType(Type.BOOLEAN)
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
.withImportance(Importance.LOW) .withImportance(Importance.LOW)
.withDescription("Controls what DDL will Debezium store in database history. " .withDescription("Controls what DDL will Debezium store in database schema history. "
+ "By default (false) Debezium will store all incoming DDL statements. If set to true, " + "By default (false) Debezium will store all incoming DDL statements. If set to true, "
+ "then only DDL that manipulates a captured table will be stored.") + "then only DDL that manipulates a captured table will be stored.")
.withDefault(false); .withDefault(false);
@ -164,7 +164,7 @@ default void recover(Offsets<?, ?> offsets, Tables schema, DdlParser ddlParser)
void stop(); void stop();
/** /**
* Determines if the database history entity exists; i.e. the storage must have * Determines if the database schema history entity exists; i.e. the storage must have
* been initialized and the history must have been populated. * been initialized and the history must have been populated.
*/ */
boolean exists(); boolean exists();

View File

@ -112,13 +112,14 @@ public void stopped() {
public void recoveryStarted() { public void recoveryStarted() {
status = SchemaHistoryStatus.RECOVERING; status = SchemaHistoryStatus.RECOVERING;
recoveryStartTime = Instant.now(); recoveryStartTime = Instant.now();
LOGGER.info("Started database history recovery"); LOGGER.info("Started database schema history recovery");
} }
@Override @Override
public void recoveryStopped() { public void recoveryStopped() {
status = SchemaHistoryStatus.RUNNING; status = SchemaHistoryStatus.RUNNING;
LOGGER.info("Finished database history recovery of {} change(s) in {} ms", changesRecovered.get(), Duration.between(recoveryStartTime, Instant.now()).toMillis()); LOGGER.info("Finished database schema history recovery of {} change(s) in {} ms", changesRecovered.get(),
Duration.between(recoveryStartTime, Instant.now()).toMillis());
} }
@Override @Override
@ -126,7 +127,7 @@ public void onChangeFromHistory(HistoryRecord record) {
lastRecoveredChange = record; lastRecoveredChange = record;
changesRecovered.incrementAndGet(); changesRecovered.incrementAndGet();
if (lastChangeRecoveredLogDelay.hasElapsed()) { if (lastChangeRecoveredLogDelay.hasElapsed()) {
LOGGER.info("Database history recovery in progress, recovered {} records", changesRecovered); LOGGER.info("Database schema history recovery in progress, recovered {} records", changesRecovered);
} }
lastChangeRecoveredTimestamp = Instant.now(); lastChangeRecoveredTimestamp = Instant.now();
} }

View File

@ -24,7 +24,7 @@
import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.ddl.DdlParser;
/** /**
* An abstract database history class, allowing each connector to extend to offer a common set of tests * An abstract database schema history class, allowing each connector to extend to offer a common set of tests
* *
* @author Chris Cranford * @author Chris Cranford
*/ */

View File

@ -44,7 +44,7 @@
public final class FileSchemaHistory extends AbstractSchemaHistory { public final class FileSchemaHistory extends AbstractSchemaHistory {
public static final Field FILE_PATH = Field.create(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "file.filename") public static final Field FILE_PATH = Field.create(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "file.filename")
.withDescription("The path to the file that will be used to record the database history") .withDescription("The path to the file that will be used to record the database schema history")
.required(); .required();
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(FILE_PATH); public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(FILE_PATH);
@ -64,7 +64,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
} }
config.validateAndRecord(ALL_FIELDS, logger::error); config.validateAndRecord(ALL_FIELDS, logger::error);
if (running.get()) { if (running.get()) {
throw new IllegalStateException("Database history file already initialized to " + path); throw new IllegalStateException("Database schema history file already initialized to " + path);
} }
super.configure(config, comparator, listener, useCatalogBeforeSchema); super.configure(config, comparator, listener, useCatalogBeforeSchema);
path = Paths.get(config.getString(FILE_PATH)); path = Paths.get(config.getString(FILE_PATH));

View File

@ -104,7 +104,7 @@ public class KafkaSchemaHistory extends AbstractSchemaHistory {
private static final short DEFAULT_TOPIC_REPLICATION_FACTOR = 1; private static final short DEFAULT_TOPIC_REPLICATION_FACTOR = 1;
public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.topic") public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.topic")
.withDisplayName("Database history topic name") .withDisplayName("Database schema history topic name")
.withType(Type.STRING) .withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 32)) .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 32))
.withWidth(Width.LONG) .withWidth(Width.LONG)
@ -126,7 +126,7 @@ public class KafkaSchemaHistory extends AbstractSchemaHistory {
public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING
+ "kafka.recovery.poll.interval.ms") + "kafka.recovery.poll.interval.ms")
.withDisplayName("Poll interval during database history recovery (ms)") .withDisplayName("Poll interval during database schema history recovery (ms)")
.withType(Type.INT) .withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1)) .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
@ -136,7 +136,7 @@ public class KafkaSchemaHistory extends AbstractSchemaHistory {
.withValidation(Field::isNonNegativeInteger); .withValidation(Field::isNonNegativeInteger);
public static final Field RECOVERY_POLL_ATTEMPTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.recovery.attempts") public static final Field RECOVERY_POLL_ATTEMPTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.recovery.attempts")
.withDisplayName("Max attempts to recovery database history") .withDisplayName("Max attempts to recovery database schema history")
.withType(Type.INT) .withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0)) .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0))
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
@ -269,9 +269,9 @@ public synchronized void start() {
@Override @Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (this.producer == null) { if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records."); throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database schema history records.");
} }
LOGGER.trace("Storing record into database history: {}", record); LOGGER.trace("Storing record into database schema history: {}", record);
try { try {
ProducerRecord<String, String> produced = new ProducerRecord<>(topicName, PARTITION, null, record.toString()); ProducerRecord<String, String> produced = new ProducerRecord<>(topicName, PARTITION, null, record.toString());
Future<RecordMetadata> future = this.producer.send(produced); Future<RecordMetadata> future = this.producer.send(produced);
@ -284,7 +284,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
LOGGER.trace("Interrupted before record was written into database history: {}", record); LOGGER.trace("Interrupted before record was written into database schema history: {}", record);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new SchemaHistoryException(e); throw new SchemaHistoryException(e);
} }
@ -297,7 +297,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
protected void recoverRecords(Consumer<HistoryRecord> records) { protected void recoverRecords(Consumer<HistoryRecord> records) {
try (KafkaConsumer<String, String> historyConsumer = new KafkaConsumer<>(consumerConfig.asProperties())) { try (KafkaConsumer<String, String> historyConsumer = new KafkaConsumer<>(consumerConfig.asProperties())) {
// Subscribe to the only partition for this topic, and seek to the beginning of that partition ... // Subscribe to the only partition for this topic, and seek to the beginning of that partition ...
LOGGER.debug("Subscribing to database history topic '{}'", topicName); LOGGER.debug("Subscribing to database schema history topic '{}'", topicName);
historyConsumer.subscribe(Collect.arrayListOf(topicName)); historyConsumer.subscribe(Collect.arrayListOf(topicName));
// Read all messages in the topic ... // Read all messages in the topic ...
@ -308,11 +308,12 @@ protected void recoverRecords(Consumer<HistoryRecord> records) {
// read the topic until the end // read the topic until the end
do { do {
if (recoveryAttempts > maxRecoveryAttempts) { if (recoveryAttempts > maxRecoveryAttempts) {
throw new IllegalStateException("The database history couldn't be recovered. Consider to increase the value for " + RECOVERY_POLL_INTERVAL_MS.name()); throw new IllegalStateException(
"The database schema history couldn't be recovered. Consider to increase the value for " + RECOVERY_POLL_INTERVAL_MS.name());
} }
endOffset = getEndOffsetOfDbHistoryTopic(endOffset, historyConsumer); endOffset = getEndOffsetOfDbHistoryTopic(endOffset, historyConsumer);
LOGGER.debug("End offset of database history topic is {}", endOffset); LOGGER.debug("End offset of database schema history topic is {}", endOffset);
// DBZ-1361 not using poll(Duration) to keep compatibility with AK 1.x // DBZ-1361 not using poll(Duration) to keep compatibility with AK 1.x
ConsumerRecords<String, String> recoveredRecords = historyConsumer.poll(this.pollInterval.toMillis()); ConsumerRecords<String, String> recoveredRecords = historyConsumer.poll(this.pollInterval.toMillis());
@ -322,20 +323,20 @@ protected void recoverRecords(Consumer<HistoryRecord> records) {
try { try {
if (lastProcessedOffset < record.offset()) { if (lastProcessedOffset < record.offset()) {
if (record.value() == null) { if (record.value() == null) {
LOGGER.warn("Skipping null database history record. " + LOGGER.warn("Skipping null database schema history record. " +
"This is often not an issue, but if it happens repeatedly please check the '{}' topic.", topicName); "This is often not an issue, but if it happens repeatedly please check the '{}' topic.", topicName);
} }
else { else {
HistoryRecord recordObj = new HistoryRecord(reader.read(record.value())); HistoryRecord recordObj = new HistoryRecord(reader.read(record.value()));
LOGGER.trace("Recovering database history: {}", recordObj); LOGGER.trace("Recovering database schema history: {}", recordObj);
if (recordObj == null || !recordObj.isValid()) { if (recordObj == null || !recordObj.isValid()) {
LOGGER.warn("Skipping invalid database history record '{}'. " + LOGGER.warn("Skipping invalid database schema history record '{}'. " +
"This is often not an issue, but if it happens repeatedly please check the '{}' topic.", "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
recordObj, topicName); recordObj, topicName);
} }
else { else {
records.accept(recordObj); records.accept(recordObj);
LOGGER.trace("Recovered database history: {}", recordObj); LOGGER.trace("Recovered database schema history: {}", recordObj);
} }
} }
lastProcessedOffset = record.offset(); lastProcessedOffset = record.offset();
@ -351,11 +352,11 @@ protected void recoverRecords(Consumer<HistoryRecord> records) {
} }
} }
if (numRecordsProcessed == 0) { if (numRecordsProcessed == 0) {
LOGGER.debug("No new records found in the database history; will retry"); LOGGER.debug("No new records found in the database schema history; will retry");
recoveryAttempts++; recoveryAttempts++;
} }
else { else {
LOGGER.debug("Processed {} records from database history", numRecordsProcessed); LOGGER.debug("Processed {} records from database schema history", numRecordsProcessed);
} }
} while (lastProcessedOffset < endOffset - 1); } while (lastProcessedOffset < endOffset - 1);
} }
@ -368,7 +369,7 @@ private Long getEndOffsetOfDbHistoryTopic(Long previousEndOffset, KafkaConsumer<
// The end offset should never change during recovery; doing this check here just as - a rather weak - attempt // The end offset should never change during recovery; doing this check here just as - a rather weak - attempt
// to spot other connectors that share the same history topic accidentally // to spot other connectors that share the same history topic accidentally
if (previousEndOffset != null && !previousEndOffset.equals(endOffset)) { if (previousEndOffset != null && !previousEndOffset.equals(endOffset)) {
throw new IllegalStateException("Detected changed end offset of database history topic (previous: " throw new IllegalStateException("Detected changed end offset of database schema history topic (previous: "
+ previousEndOffset + ", current: " + endOffset + previousEndOffset + ", current: " + endOffset
+ "). Make sure that the same history topic isn't shared by multiple connector instances."); + "). Make sure that the same history topic isn't shared by multiple connector instances.");
} }
@ -432,20 +433,21 @@ private void checkTopicSettings(String topicName) {
final String cleanupPolicy = topic.get(CLEANUP_POLICY_NAME).value(); final String cleanupPolicy = topic.get(CLEANUP_POLICY_NAME).value();
if (!CLEANUP_POLICY_VALUE.equals(cleanupPolicy)) { if (!CLEANUP_POLICY_VALUE.equals(cleanupPolicy)) {
LOGGER.warn("Database history topic '{}' option '{}' should be '{}' but is '{}'", topicName, CLEANUP_POLICY_NAME, CLEANUP_POLICY_VALUE, LOGGER.warn("Database schema history topic '{}' option '{}' should be '{}' but is '{}'", topicName, CLEANUP_POLICY_NAME, CLEANUP_POLICY_VALUE,
cleanupPolicy); cleanupPolicy);
return; return;
} }
final String retentionBytes = topic.get(RETENTION_BYTES_NAME).value(); final String retentionBytes = topic.get(RETENTION_BYTES_NAME).value();
if (retentionBytes != null && Long.parseLong(retentionBytes) != UNLIMITED_VALUE) { if (retentionBytes != null && Long.parseLong(retentionBytes) != UNLIMITED_VALUE) {
LOGGER.warn("Database history topic '{}' option '{}' should be '{}' but is '{}'", topicName, RETENTION_BYTES_NAME, UNLIMITED_VALUE, retentionBytes); LOGGER.warn("Database schema history topic '{}' option '{}' should be '{}' but is '{}'", topicName, RETENTION_BYTES_NAME, UNLIMITED_VALUE,
retentionBytes);
return; return;
} }
final String retentionMs = topic.get(RETENTION_MS_NAME).value(); final String retentionMs = topic.get(RETENTION_MS_NAME).value();
if (retentionMs != null && (Long.parseLong(retentionMs) != UNLIMITED_VALUE && Long.parseLong(retentionMs) < RETENTION_MS_MIN)) { if (retentionMs != null && (Long.parseLong(retentionMs) != UNLIMITED_VALUE && Long.parseLong(retentionMs) < RETENTION_MS_MIN)) {
LOGGER.warn("Database history topic '{}' option '{}' should be '{}' or greater than '{}' (5 years) but is '{}'", topicName, RETENTION_MS_NAME, LOGGER.warn("Database schema history topic '{}' option '{}' should be '{}' or greater than '{}' (5 years) but is '{}'", topicName, RETENTION_MS_NAME,
UNLIMITED_VALUE, RETENTION_MS_MIN, retentionMs); UNLIMITED_VALUE, RETENTION_MS_MIN, retentionMs);
return; return;
} }
@ -463,14 +465,14 @@ private void checkTopicSettings(String topicName) {
final int partitions = topicDesc.partitions().size(); final int partitions = topicDesc.partitions().size();
if (partitions != PARTITION_COUNT) { if (partitions != PARTITION_COUNT) {
LOGGER.warn("Database history topic '{}' should have one partiton but has '{}'", topicName, partitions); LOGGER.warn("Database schema history topic '{}' should have one partiton but has '{}'", topicName, partitions);
return; return;
} }
LOGGER.info("Database history topic '{}' has correct settings", topicName); LOGGER.info("Database schema history topic '{}' has correct settings", topicName);
} }
catch (Throwable e) { catch (Throwable e) {
LOGGER.info("Attempted to validate database history topic but failed", e); LOGGER.info("Attempted to validate database schema history topic but failed", e);
} }
stopCheckTopicSettingsExecutor(); stopCheckTopicSettingsExecutor();
}); });
@ -549,11 +551,11 @@ public void initializeStorage() {
try { try {
CreateTopicsResult result = admin.createTopics(Collections.singleton(topic)); CreateTopicsResult result = admin.createTopics(Collections.singleton(topic));
result.all().get(kafkaCreateTimeout.toMillis(), TimeUnit.MILLISECONDS); result.all().get(kafkaCreateTimeout.toMillis(), TimeUnit.MILLISECONDS);
LOGGER.info("Database history topic '{}' created", topic); LOGGER.info("Database schema history topic '{}' created", topic);
} }
catch (ExecutionException e) { catch (ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) { if (e.getCause() instanceof TopicExistsException) {
LOGGER.info("Database history topic '{}' already exist", topic); LOGGER.info("Database schema history topic '{}' already exist", topic);
} }
else { else {
throw e; throw e;
@ -561,7 +563,7 @@ public void initializeStorage() {
} }
} }
catch (Exception e) { catch (Exception e) {
throw new ConnectException("Creation of database history topic failed, please create the topic manually", e); throw new ConnectException("Creation of database schema history topic failed, please create the topic manually", e);
} }
} }