diff --git a/debezium-connector-sqlserver/pom.xml b/debezium-connector-sqlserver/pom.xml index 99cad6442..4b2e1ad2a 100644 --- a/debezium-connector-sqlserver/pom.xml +++ b/debezium-connector-sqlserver/pom.xml @@ -19,7 +19,7 @@ 1433 sa Password! - master + testDB microsoft/mssql-server-linux:2017-latest false true @@ -171,7 +171,6 @@ ${sqlserver.port} ${sqlserver.user} ${sqlserver.password} - ${sqlserver.dbname} ${skipLongRunningTests} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java index dc0324031..644e35af9 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java @@ -7,6 +7,8 @@ import java.util.Arrays; +import io.debezium.util.Strings; + /** * A logical representation of SQL Server LSN (log sequence number) position. * @@ -14,6 +16,8 @@ * */ public class Lsn implements Comparable { + public static final Lsn NULL = new Lsn(null); + private final byte[] binary; private int[] unsignedBinary; @@ -66,6 +70,10 @@ public String toString() { return string; } + public static Lsn valueOf(String lsnString) { + return (lsnString == null) ? NULL : new Lsn(Strings.hexStringToByteArray(lsnString.replace(":", ""))); + } + @Override public int hashCode() { final int prime = 31; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SnapshotChangeRecordEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SnapshotChangeRecordEmitter.java new file mode 100644 index 000000000..14364733b --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SnapshotChangeRecordEmitter.java @@ -0,0 +1,42 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import io.debezium.data.Envelope.Operation; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.RelationalChangeRecordEmitter; +import io.debezium.util.Clock; + +/** + * Emits change data based on a single row read via JDBC. + * + * @author Gunnar Morling + */ +public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter { + + private final Object[] row; + + public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) { + super(offset, clock); + + this.row = row; + } + + @Override + protected Operation getOperation() { + return Operation.READ; + } + + @Override + protected Object[] getOldColumnValues() { + throw new UnsupportedOperationException("Can't get old row values for READ record"); + } + + @Override + protected Object[] getNewColumnValues() { + return row; + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java index 95933951e..8fbaaf0b7 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java @@ -1,3 +1,8 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ package io.debezium.connector.sqlserver; import java.time.Instant; @@ -19,6 +24,7 @@ public class SourceInfo extends AbstractSourceInfo { public static final String SERVER_NAME_KEY = "name"; public static final String LOG_TIMESTAMP_KEY = "ts_ms"; public static final String CHANGE_LSN_KEY = "change_lsn"; + public static final String COMMIT_LSN_KEY = "commit_lsn"; public static final String SNAPSHOT_KEY = "snapshot"; public static final Schema SCHEMA = schemaBuilder() @@ -26,11 +32,14 @@ public class SourceInfo extends AbstractSourceInfo { .field(SERVER_NAME_KEY, Schema.STRING_SCHEMA) .field(LOG_TIMESTAMP_KEY, Schema.OPTIONAL_INT64_SCHEMA) .field(CHANGE_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(COMMIT_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA) .field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA) .build(); private final String serverName; private Lsn changeLsn; + private Lsn commitLsn; + private boolean snapshot; private Instant sourceTime; protected SourceInfo(String serverName) { @@ -46,10 +55,26 @@ public Lsn getChangeLsn() { return changeLsn; } + public Lsn getCommitLsn() { + return commitLsn; + } + + public void setCommitLsn(Lsn commitLsn) { + this.commitLsn = commitLsn; + } + public void setSourceTime(Instant instant) { sourceTime = instant; } + public boolean isSnapshot() { + return snapshot; + } + + public void setSnapshot(boolean snapshot) { + this.snapshot = snapshot; + } + @Override protected Schema schema() { return SCHEMA; @@ -57,10 +82,15 @@ protected Schema schema() { @Override public Struct struct() { - return super.struct() + final Struct ret = super.struct() .put(SERVER_NAME_KEY, serverName) .put(LOG_TIMESTAMP_KEY, sourceTime == null ? null : sourceTime.toEpochMilli()) .put(CHANGE_LSN_KEY, changeLsn.toString()) - .put(SNAPSHOT_KEY, false); + .put(SNAPSHOT_KEY, snapshot); + + if (commitLsn != null) { + ret.put(COMMIT_LSN_KEY, commitLsn.toString()); + } + return ret; } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java index e2f6f9070..36adaa6f6 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java @@ -11,6 +11,7 @@ import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.TableId; import io.debezium.util.Clock; public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory { @@ -18,12 +19,12 @@ public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFacto private final SqlServerConnectorConfig configuration; private final SqlServerConnection jdbcConnection; private final ErrorHandler errorHandler; - private final EventDispatcher dispatcher; + private final EventDispatcher dispatcher; private final Clock clock; private final SqlServerDatabaseSchema schema; public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration, SqlServerConnection jdbcConnection, - ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, SqlServerDatabaseSchema schema) { + ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, SqlServerDatabaseSchema schema) { this.configuration = configuration; this.jdbcConnection = jdbcConnection; this.errorHandler = errorHandler; @@ -34,7 +35,7 @@ public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration, @Override public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offsetContext) { - return new SqlServerSnapshotChangeEventSource(configuration, (SqlServerOffsetContext) offsetContext, jdbcConnection, schema); + return new SqlServerSnapshotChangeEventSource(configuration, (SqlServerOffsetContext) offsetContext, jdbcConnection, schema, dispatcher, clock); } @Override diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 48f8a6765..3685b8b0b 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -36,6 +36,7 @@ public class SqlServerConnection extends JdbcConnection { private static final String ENABLE_TABLE_CDC; private static final String CDC_WRAPPERS_DML; private static final String GET_MAX_LSN; + private static final String LOCK_TABLE; static { try { @@ -46,6 +47,7 @@ public class SqlServerConnection extends JdbcConnection { DISABLE_DB_CDC = statements.getProperty("disable_cdc_for_db"); ENABLE_TABLE_CDC = statements.getProperty("enable_cdc_for_table"); GET_MAX_LSN = statements.getProperty("get_max_lsn"); + LOCK_TABLE = statements.getProperty("lock_table"); CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql")); } catch (Exception e) { @@ -183,6 +185,11 @@ public Instant timestampOfLsn(Lsn lsn) throws SQLException { }); } + public void lockTable(TableId tableId) throws SQLException { + final String lockTableStmt = LOCK_TABLE.replace(STATEMENTS_PLACEHOLDER, tableId.table()); + execute(lockTableStmt); + } + private String cdcNameForTable(TableId tableId) { return tableId.schema() + '_' + tableId.table(); } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index a740edc26..60db4d130 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -13,8 +13,11 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; +import io.debezium.config.Field.ValidationOutput; import io.debezium.document.Document; +import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; @@ -31,8 +34,136 @@ */ public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig { - // TODO pull up to RelationalConnectorConfig - public static final String DATABASE_CONFIG_PREFIX = "database."; + /** + * The set of predefined SnapshotMode options or aliases. + */ + public static enum SnapshotMode implements EnumeratedValue { + + /** + * Perform a snapshot of data and schema upon initial startup of a connector. + */ + INITIAL("initial", true), + + /** + * Perform a snapshot of data and schema upon initial startup of a connector. + */ + INITIAL_SCHEMA_ONLY("initial_schema_only", false); + + private final String value; + private final boolean includeData; + + private SnapshotMode(String value, boolean includeData) { + this.value = value; + this.includeData = includeData; + } + + @Override + public String getValue() { + return value; + } + + /** + * Whether this snapshotting mode should include the actual data or just the + * schema of captured tables. + */ + public boolean includeData() { + return includeData; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static SnapshotMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + + for (SnapshotMode option : SnapshotMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) return option; + } + + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static SnapshotMode parse(String value, String defaultValue) { + SnapshotMode mode = parse(value); + + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + + return mode; + } + } + + /** + * The set of predefined Snapshot Locking Mode options. + */ + public static enum SnapshotLockingMode implements EnumeratedValue { + + /** + * This mode will block all reads and writes for the entire duration of the snapshot. + * + * The connector will execute {@code SELECT * FROM .. WITH (TABLOCKX)} + */ + EXCLUSIVE("exclusive"), + + /** + * This mode will avoid using ANY table locks during the snapshot process. This mode can only be used with SnapShotMode + * set to schema_only or schema_only_recovery. + */ + NONE("none"); + + private final String value; + + private SnapshotLockingMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static SnapshotLockingMode parse(String value) { + if (value == null) return null; + value = value.trim(); + for (SnapshotLockingMode option : SnapshotLockingMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) return option; + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static SnapshotLockingMode parse(String value, String defaultValue) { + SnapshotLockingMode mode = parse(value); + if (mode == null && defaultValue != null) mode = parse(defaultValue); + return mode; + } + } public static final Field LOGICAL_NAME = Field.create("database.server.name") .withDisplayName("Namespace") @@ -40,11 +171,10 @@ public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig .withWidth(Width.MEDIUM) .withImportance(Importance.HIGH) .withValidation(Field::isRequired) - // TODO - //.withValidation(Field::isRequired, MySqlConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName) + .withValidation(Field::isRequired, CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName) .withDescription("Unique name that identifies the database server and all recorded offsets, and" + "that is used as a prefix for all schemas and topics. " - + "Each distinct MySQL installation should have a separate namespace and monitored by " + + "Each distinct SQL Server installation should have a separate namespace and monitored by " + "at most one Debezium connector."); public static final Field DATABASE_NAME = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE) @@ -71,39 +201,68 @@ public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig + DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") .withDefault(KafkaDatabaseHistory.class.getName()); + public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") + .withDisplayName("Snapshot mode") + .withEnum(SnapshotMode.class, SnapshotMode.INITIAL) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("The criteria for running a snapshot upon startup of the connector. " + + "Options include: " + + "'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; " + + "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. "); + + public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode") + .withDisplayName("Snapshot locking mode") + .withEnum(SnapshotLockingMode.class, SnapshotLockingMode.EXCLUSIVE) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("Controls how long the connector locks the montiored tables for snapshot execution. The default is '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "', " + + "which means that the connector holds the exlusive lock (and thus prevents any reads and updates) for all monitored tables " + + "while the database schemas, other metadata and the data itself are being read. Using a value of '" + SnapshotLockingMode.NONE.getValue() + "' will prevent the connector from acquiring any " + + "table locks during the snapshot process. This mode can only be used in combination with snapshot.mode values of '" + SnapshotMode.INITIAL_SCHEMA_ONLY.getValue() + "' or " + + "'schema_only_recovery' and is only safe to use if no schema changes are happening while the snapshot is taken.") + .withValidation(SqlServerConnectorConfig::validateSnapshotLockingMode); + /** * The set of {@link Field}s defined as part of this configuration. */ public static Field.Set ALL_FIELDS = Field.setOf( LOGICAL_NAME, DATABASE_NAME, + SNAPSHOT_MODE, RelationalDatabaseConnectorConfig.TABLE_WHITELIST, RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN, CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, - CommonConnectorConfig.MAX_QUEUE_SIZE + CommonConnectorConfig.MAX_QUEUE_SIZE, + Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX ); + public static ConfigDef configDef() { + ConfigDef config = new ConfigDef(); + + Field.group(config, "SQL Server", LOGICAL_NAME, DATABASE_NAME, SNAPSHOT_MODE); + Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST, + RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, + RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN, + Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX + ); + Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE); + + return config; + } + private final String databaseName; + private final SnapshotMode snapshotMode; + private final SnapshotLockingMode snapshotLockingMode; public SqlServerConnectorConfig(Configuration config) { super(config, config.getString(LOGICAL_NAME), new SystemTablesPredicate()); this.databaseName = config.getString(DATABASE_NAME); - } - - public static ConfigDef configDef() { - ConfigDef config = new ConfigDef(); - - Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME); - Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST, - RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, - RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN - ); - Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE); - - return config; + this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); + this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); } public String getDatabaseName() { @@ -139,6 +298,41 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { return databaseHistory; } + public SnapshotLockingMode getSnapshotLockingMode() { + return this.snapshotLockingMode; + } + + public SnapshotMode getSnapshotMode() { + return snapshotMode; + } + + /** + * Validate the snapshot.locking.mode configuration + * The {@link SnapshotLockingMode.NONE} is allowed only for snapshot mode {@link SnapshotMode.INITIAL_SCHEMA_ONLY} + * + * @param config connector configuration + * @param field validated field (snapshot locking mode) + * @param problems the list of violated validations + * + * @return 0 for valid configuration + */ + private static int validateSnapshotLockingMode(Configuration config, Field field, ValidationOutput problems) { + final SnapshotMode snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); + final SnapshotLockingMode snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + + if (snapshotLockingMode == SnapshotLockingMode.NONE) { + if (snapshotMode != SnapshotMode.INITIAL_SCHEMA_ONLY) { + problems.accept( + field, + snapshotLockingMode, + "Snapshot locking mode '" + snapshotLockingMode.getValue() + "' is not allowed for snapshot mode '" + snapshotMode.getValue() + "'" + ); + } + } + // Everything checks out ok. + return 0; + } + private static class SystemTablesPredicate implements TableFilter { @Override diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 2cd003a50..27f68801e 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -6,6 +6,7 @@ package io.debezium.connector.sqlserver; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -24,7 +25,9 @@ import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.TableId; +import io.debezium.schema.TopicSelector; import io.debezium.util.Clock; import io.debezium.util.SchemaNameAdjuster; @@ -47,7 +50,7 @@ private static enum State { private final AtomicReference state = new AtomicReference(State.STOPPED); private volatile SqlServerTaskContext taskContext; - private volatile ChangeEventQueue queue; + private volatile ChangeEventQueue queue; private volatile SqlServerConnection jdbcConnection; private volatile ChangeEventSourceCoordinator coordinator; private volatile ErrorHandler errorHandler; @@ -72,7 +75,7 @@ public void start(Configuration config) { final Clock clock = Clock.system(); // Set up the task record queue ... - this.queue = new ChangeEventQueue.Builder() + this.queue = new ChangeEventQueue.Builder() .pollInterval(connectorConfig.getPollInterval()) .maxBatchSize(connectorConfig.getMaxBatchSize()) .maxQueueSize(connectorConfig.getMaxQueueSize()) @@ -80,7 +83,7 @@ public void start(Configuration config) { .build(); errorHandler = new ErrorHandler(SqlServerConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources); - final SqlServerTopicSelector topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig.getLogicalName()); + final TopicSelector topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig); final Configuration jdbcConfig = config.subset("database.", true); @@ -89,11 +92,10 @@ public void start(Configuration config) { this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); - final SqlServerOffsetContext previousOffset = null; -// OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig); -// if (previousOffset != null) { -// schema.recover(previousOffset); -// } + final OffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig.getLogicalName())); + if (previousOffset != null) { + schema.recover(previousOffset); + } final EventDispatcher dispatcher = new EventDispatcher<>(topicSelector, schema, queue, connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new); @@ -109,30 +111,31 @@ public void start(Configuration config) { coordinator.start(); } -// private OracleOffsetContext getPreviousOffset(SqlServerConnectorConfig connectorConfig) { -// OracleOffsetContext offsetContext = new OracleOffsetContext(connectorConfig.getLogicalName()); -// -// Map previousOffset = context.offsetStorageReader() -// .offsets(Collections.singleton(offsetContext.getPartition())) -// .get(offsetContext.getPartition()); -// -// if (previousOffset != null) { -// long scn = (long) previousOffset.get(SourceInfo.SCN_KEY); -// offsetContext.setScn(scn); -// LOGGER.info("Found previous offset {}", offsetContext); -// -// return offsetContext; -// } -// -// return null; -// } + /** + * Loads the connector's persistent offset (if present) via the given loader. + */ + protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) { + Map partition = loader.getPartition(); + + Map previousOffset = context.offsetStorageReader() + .offsets(Collections.singleton(partition)) + .get(partition); + + if (previousOffset != null) { + OffsetContext offsetContext = loader.load(previousOffset); + LOGGER.info("Found previous offset {}", offsetContext); + return offsetContext; + } + else { + return null; + } + } @Override public List poll() throws InterruptedException { - // TODO - List records = queue.poll(); + final List records = queue.poll(); - List sourceRecords = ((List)records).stream() + final List sourceRecords = records.stream() .map(DataChangeEvent::getRecord) .collect(Collectors.toList()); @@ -167,7 +170,6 @@ private void cleanupResources() { catch (InterruptedException e) { Thread.interrupted(); LOGGER.error("Interrupted while stopping coordinator", e); - // XStream code can end in SIGSEGV so fail the task instead of JVM crash throw new ConnectException("Interrupted while stopping coordinator, failing the task"); } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java index 8844f6b53..161aa456e 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java @@ -8,17 +8,15 @@ import java.sql.SQLException; import java.util.HashSet; import java.util.Set; -import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.HistorizedRelationalDatabaseSchema; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.TableSchemaBuilder; -import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.TableChanges; import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; @@ -26,7 +24,7 @@ import io.debezium.util.SchemaNameAdjuster; /** - * Logical representation of Sql Server schema. + * Logical representation of SQL Server schema. * * @author Jiri Pechanec * @@ -35,15 +33,12 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class); - private final DatabaseHistory databaseHistory; private final Set capturedTables; public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, SqlServerConnection connection) { super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, new TableSchemaBuilder(new SqlServerValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA), false); - this.databaseHistory = connectorConfig.getDatabaseHistory(); - this.databaseHistory.start(); try { this.capturedTables = determineCapturedTables(connectorConfig, connection); } @@ -52,23 +47,6 @@ public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaN } } - private static Predicate getTableFilter(SqlServerConnectorConfig connectorConfig) { - return t -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(t); - } - - @Override - public void recover(OffsetContext offset) { -// databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), new OracleDdlParser()); -// for (TableId tableId : tableIds()) { -// buildAndRegisterSchema(tableFor(tableId)); -// } - } - - @Override - public void close() { - databaseHistory.stop(); - } - @Override public void applySchemaChange(SchemaChangeEvent schemaChange) { LOGGER.debug("Applying schema change event {}", schemaChange); @@ -84,8 +62,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) { tableChanges.create(table); } -// databaseHistory.record(schemaChange.getPartition(), schemaChange.getOffset(), schemaChange.getDatabase(), -// schemaChange.getSchema(), schemaChange.getDdl(), tableChanges); + record(schemaChange, tableChanges); } public Set getCapturedTables() { @@ -108,4 +85,9 @@ private Set determineCapturedTables(SqlServerConnectorConfig connectorC return capturedTables; } + + @Override + protected DdlParser getDdlParser() { + return null; + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index afb584e0e..4ad2ad881 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -13,28 +13,27 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.pipeline.spi.OffsetContext; -import io.debezium.relational.TableId; import io.debezium.util.Collect; public class SqlServerOffsetContext implements OffsetContext { private static final String SERVER_PARTITION_KEY = "server"; - private static final String QUERY_FROM_LSN_KEY = "query_from_lsn"; - private static final String QUERY_TO_LSN_KEY = "query_to_lsn"; - private static final String QUERY_TABLE = "query_table"; + private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; private final Schema sourceInfoSchema; private final SourceInfo sourceInfo; private final Map partition; + private boolean snapshotCompleted; - private Lsn queryFromLsn; - private Lsn queryToLsn; - private TableId queryTable; - - public SqlServerOffsetContext(String serverName) { + public SqlServerOffsetContext(String serverName, Lsn lsn, boolean snapshot, boolean snapshotCompleted) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName); sourceInfo = new SourceInfo(serverName); + + sourceInfo.setChangeLsn(lsn); + sourceInfo.setSnapshot(snapshot); sourceInfoSchema = sourceInfo.schema(); + + this.snapshotCompleted = snapshotCompleted; } @Override @@ -44,12 +43,15 @@ public SqlServerOffsetContext(String serverName) { @Override public Map getOffset() { - return Collect.hashMapOf( - QUERY_FROM_LSN_KEY, queryFromLsn == null ? null : queryFromLsn.toString(), - QUERY_TO_LSN_KEY, queryToLsn == null ? null : queryToLsn.toString(), - QUERY_TABLE, queryTable == null ? null : queryTable.toString(), - SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString() - ); + if (sourceInfo.isSnapshot()) { + return Collect.hashMapOf( + SourceInfo.SNAPSHOT_KEY, true, + SNAPSHOT_COMPLETED_KEY, snapshotCompleted + ); + } + else { + return Collections.singletonMap(SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn().toString()); + } } @Override @@ -66,40 +68,55 @@ public void setChangeLsn(Lsn lsn) { sourceInfo.setChangeLsn(lsn); } + public void setCommitLsn(Lsn lsn) { + sourceInfo.setCommitLsn(lsn); + } + public void setSourceTime(Instant instant) { sourceInfo.setSourceTime(instant); } - public void setQueryFromLsn(Lsn queryFromLsn) { - this.queryFromLsn = queryFromLsn; - } - - public void setQueryToLsn(Lsn queryToLsn) { - this.queryToLsn = queryToLsn; - } - - public void setQueryTable(TableId queryTable) { - this.queryTable = queryTable; - } - @Override public boolean isSnapshotRunning() { - // TODO Auto-generated method stub - return false; + return sourceInfo.isSnapshot() && !snapshotCompleted; } @Override public void preSnapshotStart() { - // TODO Auto-generated method stub + sourceInfo.setSnapshot(true); + snapshotCompleted = false; } @Override public void preSnapshotCompletion() { - // TODO Auto-generated method stub + snapshotCompleted = true; } @Override public void postSnapshotCompletion() { - // TODO Auto-generated method stub + sourceInfo.setSnapshot(false); + } + + public static class Loader implements OffsetContext.Loader { + + private final String logicalName; + + public Loader(String logicalName) { + this.logicalName = logicalName; + } + + @Override + public Map getPartition() { + return Collections.singletonMap(SERVER_PARTITION_KEY, logicalName); + } + + @Override + public OffsetContext load(Map offset) { + final Lsn lsn = Lsn.valueOf((String)offset.get(SourceInfo.CHANGE_LSN_KEY)); + boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); + boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); + + return new SqlServerOffsetContext(logicalName, lsn, snapshot, snapshotCompleted); + } } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java index 3be0b91eb..0eb0f2a6a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java @@ -10,7 +10,6 @@ import io.debezium.pipeline.spi.SchemaChangeEventEmitter; import io.debezium.relational.TableId; -import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; /** * {@link SchemaChangeEventEmitter} implementation based on SqlServer. @@ -31,37 +30,6 @@ public SqlServerSchemaChangeEventEmitter(SqlServerOffsetContext offsetContext, T @Override public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException { -// SchemaChangeEventType eventType = getSchemaChangeEventType(); -// if (eventType == null) { -// return; -// } -// -// Tables tables = new Tables(); -// -// SqlServerDdlParser parser = new SqlServerDdlParser(); -// parser.setCurrentDatabase(ddlLcr.getSourceDatabaseName()); -// parser.setCurrentSchema(ddlLcr.getObjectOwner()); -// parser.parse(ddlLcr.getDDLText(), tables); -// -// Set changedTableIds = tables.drainChanges(); -// if (changedTableIds.isEmpty()) { -// throw new IllegalArgumentException("Couldn't parse DDL statement " + ddlLcr.getDDLText()); -// } -// -// Table table = tables.forTable(tableId); -// -// receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), ddlLcr.getDDLText(), table, eventType, false)); - } - - private SchemaChangeEventType getSchemaChangeEventType() { -// switch(ddlLcr.getCommandType()) { -// case "CREATE TABLE": return SchemaChangeEventType.CREATE; -// case "ALTER TABLE": LOGGER.warn("ALTER TABLE not yet implemented"); -// case "DROP TABLE": LOGGER.warn("DROP TABLE not yet implemented"); -// default: -// LOGGER.debug("Ignoring DDL event of type {}", ddlLcr.getCommandType()); -// return null; -// } - return null; + throw new UnsupportedOperationException("Schema evolution is not supported by the connector"); } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index 8dd621d64..7b9812746 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -5,104 +5,102 @@ */ package io.debezium.connector.sqlserver; -import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Savepoint; import java.sql.Statement; +import java.time.Instant; import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; -import io.debezium.pipeline.spi.SnapshotResult; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.spi.ChangeRecordEmitter; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.HistorizedRelationalSnapshotChangeEventSource; import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.Tables; import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; +import io.debezium.util.Clock; -public class SqlServerSnapshotChangeEventSource implements SnapshotChangeEventSource { +public class SqlServerSnapshotChangeEventSource extends HistorizedRelationalSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotChangeEventSource.class); private final SqlServerConnectorConfig connectorConfig; - private final SqlServerOffsetContext previousOffset; private final SqlServerConnection jdbcConnection; - private final SqlServerDatabaseSchema schema; - public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection, SqlServerDatabaseSchema schema) { + public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection, SqlServerDatabaseSchema schema, EventDispatcher dispatcher, Clock clock) { + super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock); this.connectorConfig = connectorConfig; - this.previousOffset = previousOffset; this.jdbcConnection = jdbcConnection; - this.schema = schema; } @Override - public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException { - // for now, just simple schema snapshotting is supported which just needs to be done once - if (previousOffset != null) { - LOGGER.debug("Found previous offset, skipping snapshotting"); - return SnapshotResult.completed(previousOffset); + protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) { + boolean snapshotSchema = true; + boolean snapshotData = true; + + // found a previous offset and the earlier snapshot has completed + if (previousOffset != null && !previousOffset.isSnapshotRunning()) { + snapshotSchema = false; + snapshotData = false; + } + else { + snapshotData = connectorConfig.getSnapshotMode().includeData(); } - Connection connection = null; - SnapshotContext ctx = null; + return new SnapshottingTask(snapshotSchema, snapshotData); + } - try { - connection = jdbcConnection.connection(); - connection.setAutoCommit(false); + @Override + protected SnapshotContext prepare(ChangeEventSourceContext context) throws Exception { + return new SqlServerSnapshotContext(connectorConfig.getDatabaseName()); + } - ctx = new SnapshotContext( - context, - connection, - connectorConfig.getDatabaseName() - ); + @Override + protected Set getAllTableIds(SnapshotContext ctx) throws Exception { + return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[] {"TABLE"}); + } - ctx.capturedTables = schema.getCapturedTables(); + @Override + protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { + if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.NONE) { + return; + } - if (!lockDatabase(ctx)) { - return SnapshotResult.aborted(); + ((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot"); + + try (Statement statement = jdbcConnection.connection().createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + for (TableId tableId : snapshotContext.capturedTables) { + if (!sourceContext.isRunning()) { + throw new InterruptedException("Interrupted while locking table " + tableId); + } + + LOGGER.info("Locking table {}", tableId); + + statement.executeQuery("SELECT * FROM " + tableId.table() + " WITH (TABLOCKX)").close(); } - - determineOffsetContextWithLsn(ctx); - readTableStructure(ctx); - - if (!createSchemaChangeEventsForTables(ctx)) { - return SnapshotResult.aborted(); - } - - return SnapshotResult.completed(ctx.offset); - } - catch(RuntimeException e) { - throw e; - } - catch(Exception e) { - throw new RuntimeException(e); - } - finally { - if (ctx != null) { - ctx.dispose(); - } - - rollbackTransaction(connection); } } - private boolean lockDatabase(SnapshotContext ctx) throws SQLException { - // TODO use SET SINGLE - return true; + @Override + protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException { + jdbcConnection.connection().rollback(((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint); } - private void determineOffsetContextWithLsn(SnapshotContext ctx) throws SQLException { - ctx.offset = new SqlServerOffsetContext(connectorConfig.getLogicalName()); - final Lsn lsn = jdbcConnection.getMaxLsn(); + @Override + protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception { + ctx.offset = new SqlServerOffsetContext(connectorConfig.getLogicalName(), jdbcConnection.getMaxLsn(), true, false); } - private void readTableStructure(SnapshotContext ctx) throws SQLException { - ctx.tables = new Tables(); - - Set schemas = ctx.capturedTables.stream() + @Override + protected void readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { + Set schemas = snapshotContext.capturedTables.stream() .map(TableId::schema) .collect(Collectors.toSet()); @@ -110,9 +108,13 @@ private void readTableStructure(SnapshotContext ctx) throws SQLException { // while the passed table name filter alone would skip all non-included tables, reading the schema // would take much longer that way for (String schema : schemas) { + if (!sourceContext.isRunning()) { + throw new InterruptedException("Interrupted while reading structure of schema " + schema); + } + jdbcConnection.readSchema( - ctx.tables, - ctx.catalogName, + snapshotContext.tables, + snapshotContext.catalogName, schema, connectorConfig.getTableFilters().dataCollectionFilter(), null, @@ -121,62 +123,37 @@ private void readTableStructure(SnapshotContext ctx) throws SQLException { } } - private boolean createSchemaChangeEventsForTables(SnapshotContext ctx) throws SQLException { - for (TableId tableId : ctx.capturedTables) { - if (!ctx.changeEventSourceContext.isRunning()) { - return false; - } - - LOGGER.debug("Capturing structure of table {}", tableId); - Table table = ctx.tables.forTable(tableId); - - // TODO - use sp_help and sp_columns to build CREATE TABLE - final String ddl = ""; - - schema.applySchemaChange(new SchemaChangeEvent(ctx.offset.getPartition(), ctx.offset.getOffset(), ctx.catalogName, - tableId.schema(), ddl, table, SchemaChangeEventType.CREATE, true)); - } - - return true; + @Override + protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext, Table table) throws SQLException { + return new SchemaChangeEvent(snapshotContext.offset.getPartition(), snapshotContext.offset.getOffset(), snapshotContext.catalogName, + table.id().schema(), null, table, SchemaChangeEventType.CREATE, true); } - private void rollbackTransaction(Connection connection) { - if(connection != null) { - try { - connection.rollback(); - } - catch (SQLException e) { - throw new RuntimeException(e); - } - } + @Override + protected void complete() { + } + + @Override + protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) { + return "SELECT * FROM " + tableId.schema() + "." + tableId.table(); + } + + @Override + protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] row) { + ((SqlServerOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(getClock().currentTimeInMillis())); + return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, getClock()); } /** * Mutable context which is populated in the course of snapshotting. */ - private static class SnapshotContext { + private static class SqlServerSnapshotContext extends SnapshotContext { - public final ChangeEventSourceContext changeEventSourceContext; - public final Statement statement; - public final String catalogName; + private Savepoint preSchemaSnapshotSavepoint; - public Set capturedTables; - public SqlServerOffsetContext offset; - public Tables tables; - - public SnapshotContext(ChangeEventSourceContext changeEventSourceContext, Connection connection, String catalogName) throws SQLException { - this.changeEventSourceContext = changeEventSourceContext; - this.statement = connection.createStatement(); - this.catalogName = catalogName; - } - - public void dispose() { - try { - statement.close(); - } - catch (SQLException e) { - LOGGER.error("Couldn't close statement", e); - } + public SqlServerSnapshotContext(String catalogName) throws SQLException { + super(catalogName); } } + } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 0969cb265..a9da18546 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -19,6 +19,7 @@ import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; import io.debezium.util.Clock; +import io.debezium.util.Metronome; /** * A {@link StreamingChangeEventSource} based on SQL Server change data capture functionality. @@ -55,6 +56,7 @@ public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorCon @Override public void execute(ChangeEventSourceContext context) throws InterruptedException { + final Metronome metronome = Metronome.sleeper(pollInterval, clock); try { final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]); Lsn lastProcessedLsn = new Lsn(null); @@ -64,12 +66,12 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio // Probably cannot happen but it is better to guard against such // situation if (!currentMaxLsn.isAvailable()) { - Thread.sleep(pollInterval.toMillis()); + metronome.pause(); continue; } // There is no change in the database if (currentMaxLsn.equals(lastProcessedLsn)) { - Thread.sleep(pollInterval.toMillis()); + metronome.pause(); continue; } @@ -119,16 +121,22 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null; offsetContext.setChangeLsn(rowLsn); + offsetContext.setCommitLsn(commitLsn); offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn)); - offsetContext.setQueryFromLsn(fromLsn); - offsetContext.setQueryToLsn(currentMaxLsn); - offsetContext.setQueryTable(tableId); try { dispatcher .dispatchDataChangeEvent( - tableId, new SqlServerChangeRecordEmitter(offsetContext, operation, - data, dataNext, schema.tableFor(tableId), clock)); + tableId, + new SqlServerChangeRecordEmitter( + offsetContext, + operation, + data, + dataNext, + schema.tableFor(tableId), + clock + ) + ); } catch (InterruptedException e) { break; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTopicSelector.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTopicSelector.java index 8ce324042..94221a398 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTopicSelector.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTopicSelector.java @@ -14,20 +14,10 @@ * @author Jiri Pechanec * */ -public class SqlServerTopicSelector implements TopicSelector { +public class SqlServerTopicSelector { - private final String prefix; - - public SqlServerTopicSelector(String prefix) { - this.prefix = prefix; - } - - public static SqlServerTopicSelector defaultSelector(String prefix) { - return new SqlServerTopicSelector(prefix); - } - - @Override - public String topicNameFor(TableId tableId) { - return String.join(".", prefix, tableId.schema(), tableId.table()); + public static TopicSelector defaultSelector(SqlServerConnectorConfig connectorConfig) { + return TopicSelector.defaultSelector(connectorConfig, + (tableId, prefix, delimiter) -> String.join(delimiter, prefix, tableId.schema(), tableId.table())); } } diff --git a/debezium-connector-sqlserver/src/main/resources/statements.properties b/debezium-connector-sqlserver/src/main/resources/statements.properties index 8001dc305..7075bb2ea 100644 --- a/debezium-connector-sqlserver/src/main/resources/statements.properties +++ b/debezium-connector-sqlserver/src/main/resources/statements.properties @@ -5,3 +5,4 @@ disable_cdc_for_db=IF EXISTS(select 1 from sys.databases where name='#' AND is_c enable_cdc_for_table=IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n\ EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0 get_max_lsn=SELECT sys.fn_cdc_get_max_lsn() +lock_table=SELECT * FROM # WITH (TABLOCKX) diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotDatatypesIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/DatatypesFromSnapshotIT.java similarity index 70% rename from debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotDatatypesIT.java rename to debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/DatatypesFromSnapshotIT.java index b770877ee..e63e523a2 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotDatatypesIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/DatatypesFromSnapshotIT.java @@ -11,15 +11,17 @@ import org.junit.BeforeClass; import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.util.TestHelper; import io.debezium.util.Testing; /** * Integration test to verify different Oracle datatypes. + * The types are discovered during snapshotting phase. * * @author Jiri Pechanec */ -public class SnapshotDatatypesIT extends AbstractSqlServerDatatypesTest { +public class DatatypesFromSnapshotIT extends AbstractSqlServerDatatypesTest { @BeforeClass public static void beforeClass() throws SQLException { @@ -32,7 +34,9 @@ public void before() throws Exception { Testing.Debug.enable(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); - Configuration config = TestHelper.defaultConfig().build(); + Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) + .build(); start(SqlServerConnector.class, config); assertConnectorIsRunning(); Thread.sleep(1000); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java index df5971f48..856306603 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java @@ -9,7 +9,7 @@ import java.math.BigInteger; import java.sql.SQLException; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import io.debezium.connector.sqlserver.util.TestHelper; @@ -22,14 +22,16 @@ */ public class SqlServerConnectionIT { - @BeforeClass - public static void beforeClass() throws SQLException { + @Before + public void before() throws SQLException { + TestHelper.dropTestDatabase(); } @Test public void shouldEnableCdcForDatabase() throws Exception { try (SqlServerConnection connection = TestHelper.adminConnection()) { connection.connect(); + connection.execute("CREATE DATABASE testDB"); connection.execute("USE testDB"); // NOTE: you cannot enable CDC on master connection.enableDbCdc("testDB"); @@ -40,6 +42,7 @@ public void shouldEnableCdcForDatabase() throws Exception { public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception { try (SqlServerConnection connection = TestHelper.adminConnection()) { connection.connect(); + connection.execute("CREATE DATABASE testDB"); connection.execute("USE testDB"); // NOTE: you cannot enable CDC on master connection.enableDbCdc("testDB"); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index c51350b43..3513467ce 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -16,12 +16,11 @@ import org.apache.kafka.connect.source.SourceRecord; import org.fest.assertions.Assertions; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.util.TestHelper; import io.debezium.data.SchemaAndValueField; import io.debezium.embedded.AbstractConnectorTest; @@ -34,18 +33,7 @@ */ public class SqlServerConnectorIT extends AbstractConnectorTest { - private static SqlServerConnection connection; - - @BeforeClass - public static void beforeClass() { - } - - @AfterClass - public static void closeConnection() throws SQLException { - if (connection != null) { - connection.close(); - } - } + private SqlServerConnection connection; @Before public void before() throws SQLException { @@ -65,7 +53,9 @@ public void before() throws SQLException { @After public void after() throws SQLException { - connection.close(); + if (connection != null) { + connection.close(); + } TestHelper.dropTestDatabase(); } @@ -74,7 +64,9 @@ public void createAndDelete() throws Exception { final int RECORDS_PER_TABLE = 5; final int TABLES = 2; final int ID_START = 10; - final Configuration config = TestHelper.defaultConfig().build(); + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) + .build(); start(SqlServerConnector.class, config); assertConnectorIsRunning(); @@ -146,7 +138,9 @@ public void createAndDelete() throws Exception { public void update() throws Exception { final int RECORDS_PER_TABLE = 5; final int ID_START = 10; - final Configuration config = TestHelper.defaultConfig().build(); + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) + .build(); start(SqlServerConnector.class, config); assertConnectorIsRunning(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index 6f8f502ec..f8b98de1d 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -21,7 +21,6 @@ * @author Horia Chiorean (hchiorea@redhat.com) */ public class TestHelper { - public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath(); public static final String TEST_DATABASE = "testDB"; @@ -100,7 +99,7 @@ public static void dropTestDatabase() { connection.execute(sql); } catch (SQLException e) { - throw new IllegalStateException("Error while initating test database", e); + throw new IllegalStateException("Error while dropping test database", e); } }