DBZ-40 Snapshotting and connector restart

This commit is contained in:
Jiri Pechanec 2018-07-17 12:51:56 +02:00 committed by Gunnar Morling
parent 9c8508adb1
commit 5bbd3bc2ea
19 changed files with 526 additions and 300 deletions

View File

@ -19,7 +19,7 @@
<sqlserver.port>1433</sqlserver.port>
<sqlserver.user>sa</sqlserver.user>
<sqlserver.password>Password!</sqlserver.password>
<sqlserver.dbname>master</sqlserver.dbname>
<sqlserver.dbname>testDB</sqlserver.dbname>
<docker.filter>microsoft/mssql-server-linux:2017-latest</docker.filter>
<docker.skip>false</docker.skip>
<docker.showLogs>true</docker.showLogs>
@ -171,7 +171,6 @@
<database.port>${sqlserver.port}</database.port>
<database.user>${sqlserver.user}</database.user>
<database.password>${sqlserver.password}</database.password>
<database.dbname>${sqlserver.dbname}</database.dbname>
<skipLongRunningTests>${skipLongRunningTests}</skipLongRunningTests>
</systemPropertyVariables>
</configuration>

View File

@ -7,6 +7,8 @@
import java.util.Arrays;
import io.debezium.util.Strings;
/**
* A logical representation of SQL Server LSN (log sequence number) position.
*
@ -14,6 +16,8 @@
*
*/
public class Lsn implements Comparable<Lsn> {
public static final Lsn NULL = new Lsn(null);
private final byte[] binary;
private int[] unsignedBinary;
@ -66,6 +70,10 @@ public String toString() {
return string;
}
public static Lsn valueOf(String lsnString) {
return (lsnString == null) ? NULL : new Lsn(Strings.hexStringToByteArray(lsnString.replace(":", "")));
}
@Override
public int hashCode() {
final int prime = 31;

View File

@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.util.Clock;
/**
* Emits change data based on a single row read via JDBC.
*
* @author Gunnar Morling
*/
public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter {
private final Object[] row;
public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) {
super(offset, clock);
this.row = row;
}
@Override
protected Operation getOperation() {
return Operation.READ;
}
@Override
protected Object[] getOldColumnValues() {
throw new UnsupportedOperationException("Can't get old row values for READ record");
}
@Override
protected Object[] getNewColumnValues() {
return row;
}
}

View File

@ -1,3 +1,8 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.time.Instant;
@ -19,6 +24,7 @@ public class SourceInfo extends AbstractSourceInfo {
public static final String SERVER_NAME_KEY = "name";
public static final String LOG_TIMESTAMP_KEY = "ts_ms";
public static final String CHANGE_LSN_KEY = "change_lsn";
public static final String COMMIT_LSN_KEY = "commit_lsn";
public static final String SNAPSHOT_KEY = "snapshot";
public static final Schema SCHEMA = schemaBuilder()
@ -26,11 +32,14 @@ public class SourceInfo extends AbstractSourceInfo {
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(LOG_TIMESTAMP_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(CHANGE_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(COMMIT_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();
private final String serverName;
private Lsn changeLsn;
private Lsn commitLsn;
private boolean snapshot;
private Instant sourceTime;
protected SourceInfo(String serverName) {
@ -46,10 +55,26 @@ public Lsn getChangeLsn() {
return changeLsn;
}
public Lsn getCommitLsn() {
return commitLsn;
}
public void setCommitLsn(Lsn commitLsn) {
this.commitLsn = commitLsn;
}
public void setSourceTime(Instant instant) {
sourceTime = instant;
}
public boolean isSnapshot() {
return snapshot;
}
public void setSnapshot(boolean snapshot) {
this.snapshot = snapshot;
}
@Override
protected Schema schema() {
return SCHEMA;
@ -57,10 +82,15 @@ protected Schema schema() {
@Override
public Struct struct() {
return super.struct()
final Struct ret = super.struct()
.put(SERVER_NAME_KEY, serverName)
.put(LOG_TIMESTAMP_KEY, sourceTime == null ? null : sourceTime.toEpochMilli())
.put(CHANGE_LSN_KEY, changeLsn.toString())
.put(SNAPSHOT_KEY, false);
.put(SNAPSHOT_KEY, snapshot);
if (commitLsn != null) {
ret.put(COMMIT_LSN_KEY, commitLsn.toString());
}
return ret;
}
}

View File

@ -11,6 +11,7 @@
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory {
@ -18,12 +19,12 @@ public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFacto
private final SqlServerConnectorConfig configuration;
private final SqlServerConnection jdbcConnection;
private final ErrorHandler errorHandler;
private final EventDispatcher dispatcher;
private final EventDispatcher<TableId> dispatcher;
private final Clock clock;
private final SqlServerDatabaseSchema schema;
public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration, SqlServerConnection jdbcConnection,
ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, SqlServerDatabaseSchema schema) {
ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock, SqlServerDatabaseSchema schema) {
this.configuration = configuration;
this.jdbcConnection = jdbcConnection;
this.errorHandler = errorHandler;
@ -34,7 +35,7 @@ public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration,
@Override
public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offsetContext) {
return new SqlServerSnapshotChangeEventSource(configuration, (SqlServerOffsetContext) offsetContext, jdbcConnection, schema);
return new SqlServerSnapshotChangeEventSource(configuration, (SqlServerOffsetContext) offsetContext, jdbcConnection, schema, dispatcher, clock);
}
@Override

View File

@ -36,6 +36,7 @@ public class SqlServerConnection extends JdbcConnection {
private static final String ENABLE_TABLE_CDC;
private static final String CDC_WRAPPERS_DML;
private static final String GET_MAX_LSN;
private static final String LOCK_TABLE;
static {
try {
@ -46,6 +47,7 @@ public class SqlServerConnection extends JdbcConnection {
DISABLE_DB_CDC = statements.getProperty("disable_cdc_for_db");
ENABLE_TABLE_CDC = statements.getProperty("enable_cdc_for_table");
GET_MAX_LSN = statements.getProperty("get_max_lsn");
LOCK_TABLE = statements.getProperty("lock_table");
CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql"));
}
catch (Exception e) {
@ -183,6 +185,11 @@ public Instant timestampOfLsn(Lsn lsn) throws SQLException {
});
}
public void lockTable(TableId tableId) throws SQLException {
final String lockTableStmt = LOCK_TABLE.replace(STATEMENTS_PLACEHOLDER, tableId.table());
execute(lockTableStmt);
}
private String cdcNameForTable(TableId tableId) {
return tableId.schema() + '_' + tableId.table();
}

View File

@ -13,8 +13,11 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.document.Document;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
@ -31,8 +34,136 @@
*/
public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig {
// TODO pull up to RelationalConnectorConfig
public static final String DATABASE_CONFIG_PREFIX = "database.";
/**
* The set of predefined SnapshotMode options or aliases.
*/
public static enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
INITIAL("initial", true),
/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
INITIAL_SCHEMA_ONLY("initial_schema_only", false);
private final String value;
private final boolean includeData;
private SnapshotMode(String value, boolean includeData) {
this.value = value;
this.includeData = includeData;
}
@Override
public String getValue() {
return value;
}
/**
* Whether this snapshotting mode should include the actual data or just the
* schema of captured tables.
*/
public boolean includeData() {
return includeData;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static SnapshotMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SnapshotMode option : SnapshotMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotMode parse(String value, String defaultValue) {
SnapshotMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
/**
* The set of predefined Snapshot Locking Mode options.
*/
public static enum SnapshotLockingMode implements EnumeratedValue {
/**
* This mode will block all reads and writes for the entire duration of the snapshot.
*
* The connector will execute {@code SELECT * FROM .. WITH (TABLOCKX)}
*/
EXCLUSIVE("exclusive"),
/**
* This mode will avoid using ANY table locks during the snapshot process. This mode can only be used with SnapShotMode
* set to schema_only or schema_only_recovery.
*/
NONE("none");
private final String value;
private SnapshotLockingMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static SnapshotLockingMode parse(String value) {
if (value == null) return null;
value = value.trim();
for (SnapshotLockingMode option : SnapshotLockingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotLockingMode parse(String value, String defaultValue) {
SnapshotLockingMode mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}
public static final Field LOGICAL_NAME = Field.create("database.server.name")
.withDisplayName("Namespace")
@ -40,11 +171,10 @@ public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
// TODO
//.withValidation(Field::isRequired, MySqlConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName)
.withValidation(Field::isRequired, CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName)
.withDescription("Unique name that identifies the database server and all recorded offsets, and"
+ "that is used as a prefix for all schemas and topics. "
+ "Each distinct MySQL installation should have a separate namespace and monitored by "
+ "Each distinct SQL Server installation should have a separate namespace and monitored by "
+ "at most one Debezium connector.");
public static final Field DATABASE_NAME = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE)
@ -71,39 +201,68 @@ public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig
+ DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(KafkaDatabaseHistory.class.getName());
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The criteria for running a snapshot upon startup of the connector. "
+ "Options include: "
+ "'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; "
+ "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. ");
public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode")
.withDisplayName("Snapshot locking mode")
.withEnum(SnapshotLockingMode.class, SnapshotLockingMode.EXCLUSIVE)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Controls how long the connector locks the montiored tables for snapshot execution. The default is '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "', "
+ "which means that the connector holds the exlusive lock (and thus prevents any reads and updates) for all monitored tables "
+ "while the database schemas, other metadata and the data itself are being read. Using a value of '" + SnapshotLockingMode.NONE.getValue() + "' will prevent the connector from acquiring any "
+ "table locks during the snapshot process. This mode can only be used in combination with snapshot.mode values of '" + SnapshotMode.INITIAL_SCHEMA_ONLY.getValue() + "' or "
+ "'schema_only_recovery' and is only safe to use if no schema changes are happening while the snapshot is taken.")
.withValidation(SqlServerConnectorConfig::validateSnapshotLockingMode);
/**
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(
LOGICAL_NAME,
DATABASE_NAME,
SNAPSHOT_MODE,
RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
CommonConnectorConfig.POLL_INTERVAL_MS,
CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE
CommonConnectorConfig.MAX_QUEUE_SIZE,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX
);
public static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "SQL Server", LOGICAL_NAME, DATABASE_NAME, SNAPSHOT_MODE);
Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX
);
Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE);
return config;
}
private final String databaseName;
private final SnapshotMode snapshotMode;
private final SnapshotLockingMode snapshotLockingMode;
public SqlServerConnectorConfig(Configuration config) {
super(config, config.getString(LOGICAL_NAME), new SystemTablesPredicate());
this.databaseName = config.getString(DATABASE_NAME);
}
public static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME);
Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN
);
Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE);
return config;
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
}
public String getDatabaseName() {
@ -139,6 +298,41 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return databaseHistory;
}
public SnapshotLockingMode getSnapshotLockingMode() {
return this.snapshotLockingMode;
}
public SnapshotMode getSnapshotMode() {
return snapshotMode;
}
/**
* Validate the snapshot.locking.mode configuration
* The {@link SnapshotLockingMode.NONE} is allowed only for snapshot mode {@link SnapshotMode.INITIAL_SCHEMA_ONLY}
*
* @param config connector configuration
* @param field validated field (snapshot locking mode)
* @param problems the list of violated validations
*
* @return 0 for valid configuration
*/
private static int validateSnapshotLockingMode(Configuration config, Field field, ValidationOutput problems) {
final SnapshotMode snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
final SnapshotLockingMode snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
if (snapshotLockingMode == SnapshotLockingMode.NONE) {
if (snapshotMode != SnapshotMode.INITIAL_SCHEMA_ONLY) {
problems.accept(
field,
snapshotLockingMode,
"Snapshot locking mode '" + snapshotLockingMode.getValue() + "' is not allowed for snapshot mode '" + snapshotMode.getValue() + "'"
);
}
}
// Everything checks out ok.
return 0;
}
private static class SystemTablesPredicate implements TableFilter {
@Override

View File

@ -6,6 +6,7 @@
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -24,7 +25,9 @@
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
@ -47,7 +50,7 @@ private static enum State {
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
private volatile SqlServerTaskContext taskContext;
private volatile ChangeEventQueue<Object> queue;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile SqlServerConnection jdbcConnection;
private volatile ChangeEventSourceCoordinator coordinator;
private volatile ErrorHandler errorHandler;
@ -72,7 +75,7 @@ public void start(Configuration config) {
final Clock clock = Clock.system();
// Set up the task record queue ...
this.queue = new ChangeEventQueue.Builder<Object>()
this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
@ -80,7 +83,7 @@ public void start(Configuration config) {
.build();
errorHandler = new ErrorHandler(SqlServerConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources);
final SqlServerTopicSelector topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig.getLogicalName());
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final Configuration jdbcConfig = config.subset("database.", true);
@ -89,11 +92,10 @@ public void start(Configuration config) {
this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
final SqlServerOffsetContext previousOffset = null;
// OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig);
// if (previousOffset != null) {
// schema.recover(previousOffset);
// }
final OffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig.getLogicalName()));
if (previousOffset != null) {
schema.recover(previousOffset);
}
final EventDispatcher<TableId> dispatcher = new EventDispatcher<>(topicSelector, schema, queue,
connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new);
@ -109,30 +111,31 @@ public void start(Configuration config) {
coordinator.start();
}
// private OracleOffsetContext getPreviousOffset(SqlServerConnectorConfig connectorConfig) {
// OracleOffsetContext offsetContext = new OracleOffsetContext(connectorConfig.getLogicalName());
//
// Map<String, Object> previousOffset = context.offsetStorageReader()
// .offsets(Collections.singleton(offsetContext.getPartition()))
// .get(offsetContext.getPartition());
//
// if (previousOffset != null) {
// long scn = (long) previousOffset.get(SourceInfo.SCN_KEY);
// offsetContext.setScn(scn);
// LOGGER.info("Found previous offset {}", offsetContext);
//
// return offsetContext;
// }
//
// return null;
// }
/**
* Loads the connector's persistent offset (if present) via the given loader.
*/
protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
Map<String, ?> partition = loader.getPartition();
Map<String, Object> previousOffset = context.offsetStorageReader()
.offsets(Collections.singleton(partition))
.get(partition);
if (previousOffset != null) {
OffsetContext offsetContext = loader.load(previousOffset);
LOGGER.info("Found previous offset {}", offsetContext);
return offsetContext;
}
else {
return null;
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// TODO
List records = queue.poll();
final List<DataChangeEvent> records = queue.poll();
List<SourceRecord> sourceRecords = ((List<DataChangeEvent>)records).stream()
final List<SourceRecord> sourceRecords = records.stream()
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
@ -167,7 +170,6 @@ private void cleanupResources() {
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping coordinator", e);
// XStream code can end in SIGSEGV so fail the task instead of JVM crash
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}

View File

@ -8,17 +8,15 @@
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
@ -26,7 +24,7 @@
import io.debezium.util.SchemaNameAdjuster;
/**
* Logical representation of Sql Server schema.
* Logical representation of SQL Server schema.
*
* @author Jiri Pechanec
*
@ -35,15 +33,12 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class);
private final DatabaseHistory databaseHistory;
private final Set<TableId> capturedTables;
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, SqlServerConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
new TableSchemaBuilder(new SqlServerValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA),
false);
this.databaseHistory = connectorConfig.getDatabaseHistory();
this.databaseHistory.start();
try {
this.capturedTables = determineCapturedTables(connectorConfig, connection);
}
@ -52,23 +47,6 @@ public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaN
}
}
private static Predicate<TableId> getTableFilter(SqlServerConnectorConfig connectorConfig) {
return t -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(t);
}
@Override
public void recover(OffsetContext offset) {
// databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), new OracleDdlParser());
// for (TableId tableId : tableIds()) {
// buildAndRegisterSchema(tableFor(tableId));
// }
}
@Override
public void close() {
databaseHistory.stop();
}
@Override
public void applySchemaChange(SchemaChangeEvent schemaChange) {
LOGGER.debug("Applying schema change event {}", schemaChange);
@ -84,8 +62,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
tableChanges.create(table);
}
// databaseHistory.record(schemaChange.getPartition(), schemaChange.getOffset(), schemaChange.getDatabase(),
// schemaChange.getSchema(), schemaChange.getDdl(), tableChanges);
record(schemaChange, tableChanges);
}
public Set<TableId> getCapturedTables() {
@ -108,4 +85,9 @@ private Set<TableId> determineCapturedTables(SqlServerConnectorConfig connectorC
return capturedTables;
}
@Override
protected DdlParser getDdlParser() {
return null;
}
}

View File

@ -13,28 +13,27 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.util.Collect;
public class SqlServerOffsetContext implements OffsetContext {
private static final String SERVER_PARTITION_KEY = "server";
private static final String QUERY_FROM_LSN_KEY = "query_from_lsn";
private static final String QUERY_TO_LSN_KEY = "query_to_lsn";
private static final String QUERY_TABLE = "query_table";
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
private final Schema sourceInfoSchema;
private final SourceInfo sourceInfo;
private final Map<String, String> partition;
private boolean snapshotCompleted;
private Lsn queryFromLsn;
private Lsn queryToLsn;
private TableId queryTable;
public SqlServerOffsetContext(String serverName) {
public SqlServerOffsetContext(String serverName, Lsn lsn, boolean snapshot, boolean snapshotCompleted) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName);
sourceInfo = new SourceInfo(serverName);
sourceInfo.setChangeLsn(lsn);
sourceInfo.setSnapshot(snapshot);
sourceInfoSchema = sourceInfo.schema();
this.snapshotCompleted = snapshotCompleted;
}
@Override
@ -44,12 +43,15 @@ public SqlServerOffsetContext(String serverName) {
@Override
public Map<String, ?> getOffset() {
return Collect.hashMapOf(
QUERY_FROM_LSN_KEY, queryFromLsn == null ? null : queryFromLsn.toString(),
QUERY_TO_LSN_KEY, queryToLsn == null ? null : queryToLsn.toString(),
QUERY_TABLE, queryTable == null ? null : queryTable.toString(),
SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString()
);
if (sourceInfo.isSnapshot()) {
return Collect.hashMapOf(
SourceInfo.SNAPSHOT_KEY, true,
SNAPSHOT_COMPLETED_KEY, snapshotCompleted
);
}
else {
return Collections.singletonMap(SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn().toString());
}
}
@Override
@ -66,40 +68,55 @@ public void setChangeLsn(Lsn lsn) {
sourceInfo.setChangeLsn(lsn);
}
public void setCommitLsn(Lsn lsn) {
sourceInfo.setCommitLsn(lsn);
}
public void setSourceTime(Instant instant) {
sourceInfo.setSourceTime(instant);
}
public void setQueryFromLsn(Lsn queryFromLsn) {
this.queryFromLsn = queryFromLsn;
}
public void setQueryToLsn(Lsn queryToLsn) {
this.queryToLsn = queryToLsn;
}
public void setQueryTable(TableId queryTable) {
this.queryTable = queryTable;
}
@Override
public boolean isSnapshotRunning() {
// TODO Auto-generated method stub
return false;
return sourceInfo.isSnapshot() && !snapshotCompleted;
}
@Override
public void preSnapshotStart() {
// TODO Auto-generated method stub
sourceInfo.setSnapshot(true);
snapshotCompleted = false;
}
@Override
public void preSnapshotCompletion() {
// TODO Auto-generated method stub
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
// TODO Auto-generated method stub
sourceInfo.setSnapshot(false);
}
public static class Loader implements OffsetContext.Loader {
private final String logicalName;
public Loader(String logicalName) {
this.logicalName = logicalName;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(SERVER_PARTITION_KEY, logicalName);
}
@Override
public OffsetContext load(Map<String, ?> offset) {
final Lsn lsn = Lsn.valueOf((String)offset.get(SourceInfo.CHANGE_LSN_KEY));
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY));
return new SqlServerOffsetContext(logicalName, lsn, snapshot, snapshotCompleted);
}
}
}

View File

@ -10,7 +10,6 @@
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
/**
* {@link SchemaChangeEventEmitter} implementation based on SqlServer.
@ -31,37 +30,6 @@ public SqlServerSchemaChangeEventEmitter(SqlServerOffsetContext offsetContext, T
@Override
public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException {
// SchemaChangeEventType eventType = getSchemaChangeEventType();
// if (eventType == null) {
// return;
// }
//
// Tables tables = new Tables();
//
// SqlServerDdlParser parser = new SqlServerDdlParser();
// parser.setCurrentDatabase(ddlLcr.getSourceDatabaseName());
// parser.setCurrentSchema(ddlLcr.getObjectOwner());
// parser.parse(ddlLcr.getDDLText(), tables);
//
// Set<TableId> changedTableIds = tables.drainChanges();
// if (changedTableIds.isEmpty()) {
// throw new IllegalArgumentException("Couldn't parse DDL statement " + ddlLcr.getDDLText());
// }
//
// Table table = tables.forTable(tableId);
//
// receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), ddlLcr.getDDLText(), table, eventType, false));
}
private SchemaChangeEventType getSchemaChangeEventType() {
// switch(ddlLcr.getCommandType()) {
// case "CREATE TABLE": return SchemaChangeEventType.CREATE;
// case "ALTER TABLE": LOGGER.warn("ALTER TABLE not yet implemented");
// case "DROP TABLE": LOGGER.warn("DROP TABLE not yet implemented");
// default:
// LOGGER.debug("Ignoring DDL event of type {}", ddlLcr.getCommandType());
// return null;
// }
return null;
throw new UnsupportedOperationException("Schema evolution is not supported by the connector");
}
}

View File

@ -5,104 +5,102 @@
*/
package io.debezium.connector.sqlserver;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.time.Instant;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.HistorizedRelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
public class SqlServerSnapshotChangeEventSource implements SnapshotChangeEventSource {
public class SqlServerSnapshotChangeEventSource extends HistorizedRelationalSnapshotChangeEventSource {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotChangeEventSource.class);
private final SqlServerConnectorConfig connectorConfig;
private final SqlServerOffsetContext previousOffset;
private final SqlServerConnection jdbcConnection;
private final SqlServerDatabaseSchema schema;
public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection, SqlServerDatabaseSchema schema) {
public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection, SqlServerDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock) {
super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock);
this.connectorConfig = connectorConfig;
this.previousOffset = previousOffset;
this.jdbcConnection = jdbcConnection;
this.schema = schema;
}
@Override
public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
// for now, just simple schema snapshotting is supported which just needs to be done once
if (previousOffset != null) {
LOGGER.debug("Found previous offset, skipping snapshotting");
return SnapshotResult.completed(previousOffset);
protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
boolean snapshotSchema = true;
boolean snapshotData = true;
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
snapshotSchema = false;
snapshotData = false;
}
else {
snapshotData = connectorConfig.getSnapshotMode().includeData();
}
Connection connection = null;
SnapshotContext ctx = null;
return new SnapshottingTask(snapshotSchema, snapshotData);
}
try {
connection = jdbcConnection.connection();
connection.setAutoCommit(false);
@Override
protected SnapshotContext prepare(ChangeEventSourceContext context) throws Exception {
return new SqlServerSnapshotContext(connectorConfig.getDatabaseName());
}
ctx = new SnapshotContext(
context,
connection,
connectorConfig.getDatabaseName()
);
@Override
protected Set<TableId> getAllTableIds(SnapshotContext ctx) throws Exception {
return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[] {"TABLE"});
}
ctx.capturedTables = schema.getCapturedTables();
@Override
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.NONE) {
return;
}
if (!lockDatabase(ctx)) {
return SnapshotResult.aborted();
((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
try (Statement statement = jdbcConnection.connection().createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
for (TableId tableId : snapshotContext.capturedTables) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while locking table " + tableId);
}
LOGGER.info("Locking table {}", tableId);
statement.executeQuery("SELECT * FROM " + tableId.table() + " WITH (TABLOCKX)").close();
}
determineOffsetContextWithLsn(ctx);
readTableStructure(ctx);
if (!createSchemaChangeEventsForTables(ctx)) {
return SnapshotResult.aborted();
}
return SnapshotResult.completed(ctx.offset);
}
catch(RuntimeException e) {
throw e;
}
catch(Exception e) {
throw new RuntimeException(e);
}
finally {
if (ctx != null) {
ctx.dispose();
}
rollbackTransaction(connection);
}
}
private boolean lockDatabase(SnapshotContext ctx) throws SQLException {
// TODO use SET SINGLE
return true;
@Override
protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException {
jdbcConnection.connection().rollback(((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint);
}
private void determineOffsetContextWithLsn(SnapshotContext ctx) throws SQLException {
ctx.offset = new SqlServerOffsetContext(connectorConfig.getLogicalName());
final Lsn lsn = jdbcConnection.getMaxLsn();
@Override
protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception {
ctx.offset = new SqlServerOffsetContext(connectorConfig.getLogicalName(), jdbcConnection.getMaxLsn(), true, false);
}
private void readTableStructure(SnapshotContext ctx) throws SQLException {
ctx.tables = new Tables();
Set<String> schemas = ctx.capturedTables.stream()
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
Set<String> schemas = snapshotContext.capturedTables.stream()
.map(TableId::schema)
.collect(Collectors.toSet());
@ -110,9 +108,13 @@ private void readTableStructure(SnapshotContext ctx) throws SQLException {
// while the passed table name filter alone would skip all non-included tables, reading the schema
// would take much longer that way
for (String schema : schemas) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while reading structure of schema " + schema);
}
jdbcConnection.readSchema(
ctx.tables,
ctx.catalogName,
snapshotContext.tables,
snapshotContext.catalogName,
schema,
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
@ -121,62 +123,37 @@ private void readTableStructure(SnapshotContext ctx) throws SQLException {
}
}
private boolean createSchemaChangeEventsForTables(SnapshotContext ctx) throws SQLException {
for (TableId tableId : ctx.capturedTables) {
if (!ctx.changeEventSourceContext.isRunning()) {
return false;
}
LOGGER.debug("Capturing structure of table {}", tableId);
Table table = ctx.tables.forTable(tableId);
// TODO - use sp_help and sp_columns to build CREATE TABLE
final String ddl = "";
schema.applySchemaChange(new SchemaChangeEvent(ctx.offset.getPartition(), ctx.offset.getOffset(), ctx.catalogName,
tableId.schema(), ddl, table, SchemaChangeEventType.CREATE, true));
}
return true;
@Override
protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext, Table table) throws SQLException {
return new SchemaChangeEvent(snapshotContext.offset.getPartition(), snapshotContext.offset.getOffset(), snapshotContext.catalogName,
table.id().schema(), null, table, SchemaChangeEventType.CREATE, true);
}
private void rollbackTransaction(Connection connection) {
if(connection != null) {
try {
connection.rollback();
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
protected void complete() {
}
@Override
protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
return "SELECT * FROM " + tableId.schema() + "." + tableId.table();
}
@Override
protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] row) {
((SqlServerOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(getClock().currentTimeInMillis()));
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, getClock());
}
/**
* Mutable context which is populated in the course of snapshotting.
*/
private static class SnapshotContext {
private static class SqlServerSnapshotContext extends SnapshotContext {
public final ChangeEventSourceContext changeEventSourceContext;
public final Statement statement;
public final String catalogName;
private Savepoint preSchemaSnapshotSavepoint;
public Set<TableId> capturedTables;
public SqlServerOffsetContext offset;
public Tables tables;
public SnapshotContext(ChangeEventSourceContext changeEventSourceContext, Connection connection, String catalogName) throws SQLException {
this.changeEventSourceContext = changeEventSourceContext;
this.statement = connection.createStatement();
this.catalogName = catalogName;
}
public void dispose() {
try {
statement.close();
}
catch (SQLException e) {
LOGGER.error("Couldn't close statement", e);
}
public SqlServerSnapshotContext(String catalogName) throws SQLException {
super(catalogName);
}
}
}

View File

@ -19,6 +19,7 @@
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
/**
* A {@link StreamingChangeEventSource} based on SQL Server change data capture functionality.
@ -55,6 +56,7 @@ public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorCon
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
final Metronome metronome = Metronome.sleeper(pollInterval, clock);
try {
final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]);
Lsn lastProcessedLsn = new Lsn(null);
@ -64,12 +66,12 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
// Probably cannot happen but it is better to guard against such
// situation
if (!currentMaxLsn.isAvailable()) {
Thread.sleep(pollInterval.toMillis());
metronome.pause();
continue;
}
// There is no change in the database
if (currentMaxLsn.equals(lastProcessedLsn)) {
Thread.sleep(pollInterval.toMillis());
metronome.pause();
continue;
}
@ -119,16 +121,22 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null;
offsetContext.setChangeLsn(rowLsn);
offsetContext.setCommitLsn(commitLsn);
offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn));
offsetContext.setQueryFromLsn(fromLsn);
offsetContext.setQueryToLsn(currentMaxLsn);
offsetContext.setQueryTable(tableId);
try {
dispatcher
.dispatchDataChangeEvent(
tableId, new SqlServerChangeRecordEmitter(offsetContext, operation,
data, dataNext, schema.tableFor(tableId), clock));
tableId,
new SqlServerChangeRecordEmitter(
offsetContext,
operation,
data,
dataNext,
schema.tableFor(tableId),
clock
)
);
}
catch (InterruptedException e) {
break;

View File

@ -14,20 +14,10 @@
* @author Jiri Pechanec
*
*/
public class SqlServerTopicSelector implements TopicSelector<TableId> {
public class SqlServerTopicSelector {
private final String prefix;
public SqlServerTopicSelector(String prefix) {
this.prefix = prefix;
}
public static SqlServerTopicSelector defaultSelector(String prefix) {
return new SqlServerTopicSelector(prefix);
}
@Override
public String topicNameFor(TableId tableId) {
return String.join(".", prefix, tableId.schema(), tableId.table());
public static TopicSelector<TableId> defaultSelector(SqlServerConnectorConfig connectorConfig) {
return TopicSelector.defaultSelector(connectorConfig,
(tableId, prefix, delimiter) -> String.join(delimiter, prefix, tableId.schema(), tableId.table()));
}
}

View File

@ -5,3 +5,4 @@ disable_cdc_for_db=IF EXISTS(select 1 from sys.databases where name='#' AND is_c
enable_cdc_for_table=IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n\
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0
get_max_lsn=SELECT sys.fn_cdc_get_max_lsn()
lock_table=SELECT * FROM # WITH (TABLOCKX)

View File

@ -11,15 +11,17 @@
import org.junit.BeforeClass;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.util.Testing;
/**
* Integration test to verify different Oracle datatypes.
* The types are discovered during snapshotting phase.
*
* @author Jiri Pechanec
*/
public class SnapshotDatatypesIT extends AbstractSqlServerDatatypesTest {
public class DatatypesFromSnapshotIT extends AbstractSqlServerDatatypesTest {
@BeforeClass
public static void beforeClass() throws SQLException {
@ -32,7 +34,9 @@ public void before() throws Exception {
Testing.Debug.enable();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Configuration config = TestHelper.defaultConfig().build();
Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
Thread.sleep(1000);

View File

@ -9,7 +9,7 @@
import java.math.BigInteger;
import java.sql.SQLException;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import io.debezium.connector.sqlserver.util.TestHelper;
@ -22,14 +22,16 @@
*/
public class SqlServerConnectionIT {
@BeforeClass
public static void beforeClass() throws SQLException {
@Before
public void before() throws SQLException {
TestHelper.dropTestDatabase();
}
@Test
public void shouldEnableCdcForDatabase() throws Exception {
try (SqlServerConnection connection = TestHelper.adminConnection()) {
connection.connect();
connection.execute("CREATE DATABASE testDB");
connection.execute("USE testDB");
// NOTE: you cannot enable CDC on master
connection.enableDbCdc("testDB");
@ -40,6 +42,7 @@ public void shouldEnableCdcForDatabase() throws Exception {
public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception {
try (SqlServerConnection connection = TestHelper.adminConnection()) {
connection.connect();
connection.execute("CREATE DATABASE testDB");
connection.execute("USE testDB");
// NOTE: you cannot enable CDC on master
connection.enableDbCdc("testDB");

View File

@ -16,12 +16,11 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.embedded.AbstractConnectorTest;
@ -34,18 +33,7 @@
*/
public class SqlServerConnectorIT extends AbstractConnectorTest {
private static SqlServerConnection connection;
@BeforeClass
public static void beforeClass() {
}
@AfterClass
public static void closeConnection() throws SQLException {
if (connection != null) {
connection.close();
}
}
private SqlServerConnection connection;
@Before
public void before() throws SQLException {
@ -65,7 +53,9 @@ public void before() throws SQLException {
@After
public void after() throws SQLException {
connection.close();
if (connection != null) {
connection.close();
}
TestHelper.dropTestDatabase();
}
@ -74,7 +64,9 @@ public void createAndDelete() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig().build();
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
@ -146,7 +138,9 @@ public void createAndDelete() throws Exception {
public void update() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig().build();
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();

View File

@ -21,7 +21,6 @@
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class TestHelper {
public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
public static final String TEST_DATABASE = "testDB";
@ -100,7 +99,7 @@ public static void dropTestDatabase() {
connection.execute(sql);
}
catch (SQLException e) {
throw new IllegalStateException("Error while initating test database", e);
throw new IllegalStateException("Error while dropping test database", e);
}
}