DBZ-40 Database history initi is pulled up

This commit is contained in:
Jiri Pechanec 2018-07-25 13:26:39 +02:00 committed by Gunnar Morling
parent aa0e5a1f3c
commit 197fa41c64

View File

@ -9,7 +9,6 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
@ -18,10 +17,10 @@
import io.debezium.document.Document;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
@ -30,7 +29,7 @@
*
* @author Jiri Pechanec
*/
public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig {
public class SqlServerConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig {
/**
* The set of predefined SnapshotMode options or aliases.
@ -184,21 +183,6 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
.withDescription("The name of the database the connector should be monitoring. When working with a "
+ "multi-tenant set-up, must be set to the CDB name.");
/**
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go.
*/
public static final Field DATABASE_HISTORY = Field.create("database.history")
.withDisplayName("Database history class")
.withType(Type.CLASS)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withInvisibleRecommender()
.withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. "
+ "The configuration properties for the history are prefixed with the '"
+ DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(KafkaDatabaseHistory.class.getName());
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
@ -225,6 +209,7 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
LOGICAL_NAME,
DATABASE_NAME,
SNAPSHOT_MODE,
HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY,
RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
@ -238,6 +223,9 @@ public static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "SQL Server", LOGICAL_NAME, DATABASE_NAME, SNAPSHOT_MODE);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY);
Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
@ -264,36 +252,6 @@ public String getDatabaseName() {
return databaseName;
}
/**
* Returns a configured (but not yet started) instance of the database history.
*/
@Override
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(SqlServerConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(SqlServerConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.build();
HistoryRecordComparator historyComparator = new HistoryRecordComparator() {
@Override
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return Lsn.valueOf(recorded.getString(SourceInfo.CHANGE_LSN_KEY)).compareTo(Lsn.valueOf(desired.getString(SourceInfo.CHANGE_LSN_KEY))) < 1;
}
};
databaseHistory.configure(dbHistoryConfig, historyComparator); // validates
return databaseHistory;
}
public SnapshotLockingMode getSnapshotLockingMode() {
return this.snapshotLockingMode;
}
@ -311,4 +269,15 @@ public boolean isIncluded(TableId t) {
t.table().toLowerCase().equals("systranschemas"));
}
}
@Override
protected HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() {
@Override
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return Lsn.valueOf(recorded.getString(SourceInfo.CHANGE_LSN_KEY))
.compareTo(Lsn.valueOf(desired.getString(SourceInfo.CHANGE_LSN_KEY))) < 1;
}
};
}
}