diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index fc6258410..b85136142 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -9,7 +9,6 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; -import org.apache.kafka.connect.errors.ConnectException; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; @@ -18,10 +17,10 @@ import io.debezium.document.Document; import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; -import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.relational.history.KafkaDatabaseHistory; @@ -30,7 +29,7 @@ * * @author Jiri Pechanec */ -public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig { +public class SqlServerConnectorConfig extends HistorizedRelationalDatabaseConnectorConfig { /** * The set of predefined SnapshotMode options or aliases. @@ -184,21 +183,6 @@ public static SnapshotLockingMode parse(String value, String defaultValue) { .withDescription("The name of the database the connector should be monitoring. When working with a " + "multi-tenant set-up, must be set to the CDB name."); - /** - * The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, - * and in these situations using Kafka is the only way to go. - */ - public static final Field DATABASE_HISTORY = Field.create("database.history") - .withDisplayName("Database history class") - .withType(Type.CLASS) - .withWidth(Width.LONG) - .withImportance(Importance.LOW) - .withInvisibleRecommender() - .withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. " - + "The configuration properties for the history are prefixed with the '" - + DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") - .withDefault(KafkaDatabaseHistory.class.getName()); - public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") .withDisplayName("Snapshot mode") .withEnum(SnapshotMode.class, SnapshotMode.INITIAL) @@ -225,6 +209,7 @@ public static SnapshotLockingMode parse(String value, String defaultValue) { LOGICAL_NAME, DATABASE_NAME, SNAPSHOT_MODE, + HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY, RelationalDatabaseConnectorConfig.TABLE_WHITELIST, RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN, @@ -238,6 +223,9 @@ public static ConfigDef configDef() { ConfigDef config = new ConfigDef(); Field.group(config, "SQL Server", LOGICAL_NAME, DATABASE_NAME, SNAPSHOT_MODE); + Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS, + KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, + KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY); Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST, RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN, @@ -264,36 +252,6 @@ public String getDatabaseName() { return databaseName; } - /** - * Returns a configured (but not yet started) instance of the database history. - */ - @Override - public DatabaseHistory getDatabaseHistory() { - Configuration config = getConfig(); - - DatabaseHistory databaseHistory = config.getInstance(SqlServerConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class); - if (databaseHistory == null) { - throw new ConnectException("Unable to instantiate the database history class " + - config.getString(SqlServerConnectorConfig.DATABASE_HISTORY)); - } - - // Do not remove the prefix from the subset of config properties ... - Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false) - .edit() - .withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory") - .build(); - - HistoryRecordComparator historyComparator = new HistoryRecordComparator() { - @Override - protected boolean isPositionAtOrBefore(Document recorded, Document desired) { - return Lsn.valueOf(recorded.getString(SourceInfo.CHANGE_LSN_KEY)).compareTo(Lsn.valueOf(desired.getString(SourceInfo.CHANGE_LSN_KEY))) < 1; - } - }; - databaseHistory.configure(dbHistoryConfig, historyComparator); // validates - - return databaseHistory; - } - public SnapshotLockingMode getSnapshotLockingMode() { return this.snapshotLockingMode; } @@ -311,4 +269,15 @@ public boolean isIncluded(TableId t) { t.table().toLowerCase().equals("systranschemas")); } } + + @Override + protected HistoryRecordComparator getHistoryRecordComparator() { + return new HistoryRecordComparator() { + @Override + protected boolean isPositionAtOrBefore(Document recorded, Document desired) { + return Lsn.valueOf(recorded.getString(SourceInfo.CHANGE_LSN_KEY)) + .compareTo(Lsn.valueOf(desired.getString(SourceInfo.CHANGE_LSN_KEY))) < 1; + } + }; + } }