DBZ-40 First cut of SQL Server streaming support

This commit is contained in:
Jiri Pechanec 2018-07-10 13:36:56 +02:00 committed by Gunnar Morling
parent 2a3c58f069
commit da03fcf51d
27 changed files with 2306 additions and 58 deletions

View File

@ -0,0 +1,106 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.util.Arrays;
/**
* A logical representation of SQL Server LSN (log sequence number) position.
*
* @author Jiri Pechanec
*
*/
public class Lsn implements Comparable<Lsn> {
private final byte[] binary;
private int[] unsignedBinary;
private String string;
public Lsn(byte[] binary) {
this.binary = binary;
}
public byte[] getBinary() {
return binary;
}
public boolean isAvailable() {
return binary != null;
}
private int[] getUnsignedBinary() {
if (unsignedBinary != null || binary == null) {
return unsignedBinary;
}
unsignedBinary = new int[binary.length];
for (int i = 0; i < binary.length; i++) {
unsignedBinary[i] = Byte.toUnsignedInt(binary[i]);
}
return unsignedBinary;
}
public String toString() {
if (string != null) {
return string;
}
final StringBuilder sb = new StringBuilder();
if (binary == null) {
return "NULL";
}
final int[] unsigned = getUnsignedBinary();
for (int i = 0; i < unsigned.length; i++) {
final String byteStr = Integer.toHexString(unsigned[i]);
if (byteStr.length() == 1) {
sb.append('0');
}
sb.append(byteStr);
if (i == 3 || i == 7) {
sb.append(':');
}
}
string = sb.toString();
return string;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(binary);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Lsn other = (Lsn) obj;
if (!Arrays.equals(binary, other.binary))
return false;
return true;
}
@Override
public int compareTo(Lsn o) {
if (this == o) {
return 0;
}
final int[] thisU = getUnsignedBinary();
final int[] thatU = o.getUnsignedBinary();
for (int i = 0; i < thisU.length; i++) {
final int diff = thisU[i] - thatU[i];
if (diff != 0) {
return diff;
}
}
return 0;
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.util.Properties;
import io.debezium.util.IoUtil;
/**
* Information about this module.
*
* @author Jiri Pechanec
*/
public final class Module {
private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/connector/sqlserver/build.version");
public static String version() {
return INFO.getProperty("version");
}
}

View File

@ -0,0 +1,66 @@
package io.debezium.connector.sqlserver;
import java.time.Instant;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.AbstractSourceInfo;
/**
* Coordinates from the database log to restart streaming from. Maps to {@code source} field in enevlope and
* to connector offsets.
*
* @author Jiri Pechanec
*
*/
public class SourceInfo extends AbstractSourceInfo {
public static final String SERVER_NAME_KEY = "name";
public static final String LOG_TIMESTAMP_KEY = "ts_ms";
public static final String CHANGE_LSN_KEY = "change_lsn";
public static final String SNAPSHOT_KEY = "snapshot";
public static final Schema SCHEMA = schemaBuilder()
.name("io.debezium.connector.sqlserver.Source")
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(LOG_TIMESTAMP_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(CHANGE_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();
private final String serverName;
private Lsn changeLsn;
private Instant sourceTime;
protected SourceInfo(String serverName) {
super(Module.version());
this.serverName = serverName;
}
public void setChangeLsn(Lsn lsn) {
changeLsn = lsn;
}
public Lsn getChangeLsn() {
return changeLsn;
}
public void setSourceTime(Instant instant) {
sourceTime = instant;
}
@Override
protected Schema schema() {
return SCHEMA;
}
@Override
public Struct struct() {
return super.struct()
.put(SERVER_NAME_KEY, serverName)
.put(LOG_TIMESTAMP_KEY, sourceTime == null ? null : sourceTime.toEpochMilli())
.put(CHANGE_LSN_KEY, changeLsn.toString())
.put(SNAPSHOT_KEY, false);
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.util.Clock;
public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory {
private final SqlServerConnectorConfig configuration;
private final SqlServerConnection jdbcConnection;
private final ErrorHandler errorHandler;
private final EventDispatcher dispatcher;
private final Clock clock;
private final SqlServerDatabaseSchema schema;
public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration, SqlServerConnection jdbcConnection,
ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, SqlServerDatabaseSchema schema) {
this.configuration = configuration;
this.jdbcConnection = jdbcConnection;
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
this.schema = schema;
}
@Override
public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offsetContext) {
return new SqlServerSnapshotChangeEventSource(configuration, (SqlServerOffsetContext) offsetContext, jdbcConnection, schema);
}
@Override
public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext offsetContext) {
return new SqlServerStreamingChangeEventSource(
configuration,
(SqlServerOffsetContext) offsetContext,
jdbcConnection,
dispatcher,
errorHandler,
clock,
schema
);
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.util.Clock;
/**
* Emits change data based on a a single CDC data row.
*
* @author Jiri Pechanec
*/
public class SqlServerChangeRecordEmitter extends RelationalChangeRecordEmitter {
public static final int OP_DELETE = 1;
public static final int OP_INSERT = 2;
public static final int OP_UPDATE_BEFORE = 3;
public static final int OP_UPDATE_AFTER = 4;
private final int operation;
private final Object[] data;
private final Object[] dataNext;
public SqlServerChangeRecordEmitter(OffsetContext offset, int operation, Object[] data, Object[] dataNext, Table table, Clock clock) {
super(offset, clock);
this.operation = operation;
this.data = data;
this.dataNext = dataNext;
}
@Override
protected Operation getOperation() {
if (operation == OP_DELETE) {
return Operation.DELETE;
}
else if (operation == OP_INSERT) {
return Operation.CREATE;
}
else if (operation == OP_UPDATE_BEFORE) {
return Operation.UPDATE;
}
throw new IllegalArgumentException("Received event of unexpected command type: " + operation);
}
@Override
protected Object[] getOldColumnValues() {
switch (getOperation()) {
case CREATE:
case READ:
return null;
default:
return data;
}
}
@Override
protected Object[] getNewColumnValues() {
switch (getOperation()) {
case CREATE:
case READ:
return data;
case UPDATE:
return dataNext;
default:
return null;
}
}
}

View File

@ -7,33 +7,35 @@
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Objects;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.util.IoUtil;
/**
* {@link JdbcConnection} extension to be used with Microsoft SQL Server
*
* @author Horia Chiorean (hchiorea@redhat.com)
* @author Horia Chiorean (hchiorea@redhat.com), Jiri Pechanec
*
*/
public class SqlServerConnection extends JdbcConnection {
private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}:${"
+ JdbcConfiguration.PORT + "};databaseName=${" + JdbcConfiguration.DATABASE + "}";
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(URL_PATTERN,
com.microsoft.sqlserver.jdbc.SQLServerDriver.class.getName(), SqlServerConnection.class.getClassLoader());
private static Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class);
private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String ENABLE_DB_CDC;
private static final String DISABLE_DB_CDC;
private static final String ENABLE_TABLE_CDC;
private static final String CDC_WRAPPERS_DML;
private static final String GET_MAX_LSN;
static {
try {
@ -41,7 +43,9 @@ public class SqlServerConnection extends JdbcConnection {
ClassLoader classLoader = SqlServerConnection.class.getClassLoader();
statements.load(classLoader.getResourceAsStream("statements.properties"));
ENABLE_DB_CDC = statements.getProperty("enable_cdc_for_db");
DISABLE_DB_CDC = statements.getProperty("disable_cdc_for_db");
ENABLE_TABLE_CDC = statements.getProperty("enable_cdc_for_table");
GET_MAX_LSN = statements.getProperty("get_max_lsn");
CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql"));
}
catch (Exception e) {
@ -54,19 +58,10 @@ public class SqlServerConnection extends JdbcConnection {
*
* @param config
* {@link Configuration} instance, may not be null.
* @param factory a factory building the connection string
*/
public SqlServerConnection(Configuration config) {
super(config, FACTORY);
}
/**
* Returns a JDBC connection string for the current configuration.
*
* @return a {@code String} where the variables in {@code urlPattern} are
* replaced with values from the configuration
*/
public String connectionString() {
return connectionString(URL_PATTERN);
public SqlServerConnection(Configuration config, ConnectionFactory factory) {
super(config, factory);
}
/**
@ -82,6 +77,19 @@ public void enableDbCdc(String name) throws SQLException {
execute(ENABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name));
}
/**
* Disables CDC for a given database, if not already disabled.
*
* @param name
* the name of the DB, may not be {@code null}
* @throws SQLException
* if anything unexpected fails
*/
public void disableDbCdc(String name) throws SQLException {
Objects.requireNonNull(name);
execute(DISABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name));
}
/**
* Enables CDC for a table if not already enabled and generates the wrapper
* functions for that table.
@ -96,4 +104,86 @@ public void enableTableCdc(String name) throws SQLException {
String generateWrapperFunctionsStmts = CDC_WRAPPERS_DML.replaceAll(STATEMENTS_PLACEHOLDER, name);
execute(enableCdcForTableStmt, generateWrapperFunctionsStmts);
}
/**
* @return the current largest log sequence number
*/
public Lsn getMaxLsn() throws SQLException {
final String LSN_COUNT_ERROR = "Maximum LSN query must return exactly one value";
return queryAndMap(GET_MAX_LSN, rs -> {
if (rs.next()) {
final Lsn ret = new Lsn(rs.getBytes(1));
if (!rs.next()) {
return ret;
}
}
throw new IllegalStateException(LSN_COUNT_ERROR);
});
}
public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException {
final String cdcNameForTable = cdcNameForTable(tableId);
final String query = "SELECT * FROM cdc.fn_cdc_get_all_changes_" + cdcNameForTable + "(ISNULL(?,sys.fn_cdc_get_min_lsn('" + cdcNameForTable + "')), ?, N'all update old')";
prepareQuery(query, statement -> {
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, toLsn.getBinary());
}, consumer);
}
public void getChangesForTables(TableId[] tableIds, Lsn fromLsn, Lsn toLsn, MultiResultSetConsumer consumer) throws SQLException {
final String[] queries = new String[tableIds.length];
int idx = 0;
for (TableId tableId: tableIds) {
final String cdcNameForTable = cdcNameForTable(tableId);
final String query = "SELECT * FROM cdc.fn_cdc_get_all_changes_" + cdcNameForTable + "(ISNULL(?,sys.fn_cdc_get_min_lsn('" + cdcNameForTable + "')), ?, N'all update old')";
queries[idx++] = query;
}
prepareQuery(queries, statement -> {
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, toLsn.getBinary());
}, consumer);
}
public Lsn incrementLsn(Lsn lsn) throws SQLException {
final String LSN_INCREMENT_ERROR = "Increment LSN query must return exactly one value";
final String query = "SELECT sys.fn_cdc_increment_lsn(?)";
return prepareQueryAndMap(query, statement -> {
statement.setBytes(1, lsn.getBinary());
}, rs -> {
if (rs.next()) {
final Lsn ret = new Lsn(rs.getBytes(1));
if (!rs.next()) {
return ret;
}
}
throw new IllegalStateException(LSN_INCREMENT_ERROR);
});
}
public Instant timestampOfLsn(Lsn lsn) throws SQLException {
final String LSN_TIMESTAMP_ERROR = "LSN to timestamp query must return exactly one value";
final String query = "SELECT sys.fn_cdc_map_lsn_to_time(?)";
if (lsn.getBinary() == null) {
return null;
}
return prepareQueryAndMap(query, statement -> {
statement.setBytes(1, lsn.getBinary());
}, rs -> {
if (rs.next()) {
final Timestamp ts = rs.getTimestamp(1);
final Instant ret = ts == null ? null : ts.toInstant();
if (!rs.next()) {
return ret;
}
}
throw new IllegalStateException(LSN_TIMESTAMP_ERROR);
});
}
private String cdcNameForTable(TableId tableId) {
return tableId.schema() + '_' + tableId.table();
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection.ConnectionFactory;
public class SqlServerConnectionFactory implements ConnectionFactory {
@Override
public Connection connect(JdbcConfiguration config) throws SQLException {
String hostName = config.getHostname();
int port = config.getPort();
String database = config.getDatabase();
String user = config.getUser();
String password = config.getPassword();
return DriverManager.getConnection(
"jdbc:sqlserver://" + hostName + ":" + port + ";databaseName=" + database, user, password
);
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
/**
* The main connector class used to instantiate configuration and execution classes
*
* @author Jiri Pechanec
*
*/
public class SqlServerConnector extends SourceConnector {
private Map<String, String> properties;
@Override
public String version() {
return Module.version();
}
@Override
public void start(Map<String, String> props) {
this.properties = Collections.unmodifiableMap(new HashMap<>(props));
}
@Override
public Class<? extends Task> taskClass() {
return SqlServerConnectorTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (maxTasks > 1) {
throw new IllegalArgumentException("Only a single connector task may be started");
}
return Collections.singletonList(properties);
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return SqlServerConnectorConfig.configDef();
}
}

View File

@ -0,0 +1,151 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
/**
* The list of configuration options for SQL Server connector
*
* @author Jiri Pechanec
*
*/
public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig {
// TODO pull up to RelationalConnectorConfig
public static final String DATABASE_CONFIG_PREFIX = "database.";
public static final Field LOGICAL_NAME = Field.create("database.server.name")
.withDisplayName("Namespace")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
// TODO
//.withValidation(Field::isRequired, MySqlConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName)
.withDescription("Unique name that identifies the database server and all recorded offsets, and"
+ "that is used as a prefix for all schemas and topics. "
+ "Each distinct MySQL installation should have a separate namespace and monitored by "
+ "at most one Debezium connector.");
public static final Field DATABASE_NAME = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE)
.withDisplayName("Database name")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
.withDescription("The name of the database the connector should be monitoring. When working with a "
+ "multi-tenant set-up, must be set to the CDB name.");
/**
* The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
* and in these situations using Kafka is the only way to go.
*/
public static final Field DATABASE_HISTORY = Field.create("database.history")
.withDisplayName("Database history class")
.withType(Type.CLASS)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withInvisibleRecommender()
.withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. "
+ "The configuration properties for the history are prefixed with the '"
+ DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(KafkaDatabaseHistory.class.getName());
/**
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(
LOGICAL_NAME,
DATABASE_NAME,
RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
CommonConnectorConfig.POLL_INTERVAL_MS,
CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE
);
private final String databaseName;
public SqlServerConnectorConfig(Configuration config) {
super(config, config.getString(LOGICAL_NAME), new SystemTablesPredicate());
this.databaseName = config.getString(DATABASE_NAME);
}
public static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME);
Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN
);
Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE);
return config;
}
public String getDatabaseName() {
return databaseName;
}
/**
* Returns a configured (but not yet started) instance of the database history.
*/
public DatabaseHistory getDatabaseHistory() {
Configuration config = getConfig();
DatabaseHistory databaseHistory = config.getInstance(SqlServerConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (databaseHistory == null) {
throw new ConnectException("Unable to instantiate the database history class " +
config.getString(SqlServerConnectorConfig.DATABASE_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false)
.edit()
.withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory")
.build();
HistoryRecordComparator historyComparator = new HistoryRecordComparator() {
@Override
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return (recorded.getLong("scn")).compareTo(desired.getLong("scn")) < 1;
}
};
databaseHistory.configure(dbHistoryConfig, historyComparator); // validates
return databaseHistory;
}
private static class SystemTablesPredicate implements TableFilter {
@Override
public boolean isIncluded(TableId t) {
return !(t.schema().toLowerCase().equals("cdc") ||
t.schema().toLowerCase().equals("sys") ||
t.table().toLowerCase().equals("systranschemas"));
}
}
}

View File

@ -0,0 +1,200 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
/**
* The main task executing streaming from SQL Server.
* Responsible for lifecycle management the streaming code.
*
* @author Jiri Pechanec
*
*/
public class SqlServerConnectorTask extends BaseSourceTask {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnectorTask.class);
private static final String CONTEXT_NAME = "sql-server-connector-task";
private static enum State {
RUNNING, STOPPED;
}
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
private volatile SqlServerTaskContext taskContext;
private volatile ChangeEventQueue<Object> queue;
private volatile SqlServerConnection jdbcConnection;
private volatile ChangeEventSourceCoordinator coordinator;
private volatile ErrorHandler errorHandler;
private volatile SqlServerDatabaseSchema schema;
private volatile Map<String, ?> lastOffset;
@Override
public String version() {
return Module.version();
}
@Override
public void start(Configuration config) {
if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
LOGGER.info("Connector has already been started");
return;
}
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
taskContext = new SqlServerTaskContext(connectorConfig);
final Clock clock = Clock.system();
// Set up the task record queue ...
this.queue = new ChangeEventQueue.Builder<Object>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();
errorHandler = new ErrorHandler(SqlServerConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources);
final SqlServerTopicSelector topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig.getLogicalName());
final Configuration jdbcConfig = config.subset("database.", true);
jdbcConnection = new SqlServerConnection(jdbcConfig, new SqlServerConnectionFactory());
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
final SqlServerOffsetContext previousOffset = null;
// OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig);
// if (previousOffset != null) {
// schema.recover(previousOffset);
// }
final EventDispatcher<TableId> dispatcher = new EventDispatcher<>(topicSelector, schema, queue,
connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new);
coordinator = new ChangeEventSourceCoordinator(
previousOffset,
errorHandler,
SqlServerConnector.class,
connectorConfig.getLogicalName(),
new SqlServerChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema)
);
coordinator.start();
}
// private OracleOffsetContext getPreviousOffset(SqlServerConnectorConfig connectorConfig) {
// OracleOffsetContext offsetContext = new OracleOffsetContext(connectorConfig.getLogicalName());
//
// Map<String, Object> previousOffset = context.offsetStorageReader()
// .offsets(Collections.singleton(offsetContext.getPartition()))
// .get(offsetContext.getPartition());
//
// if (previousOffset != null) {
// long scn = (long) previousOffset.get(SourceInfo.SCN_KEY);
// offsetContext.setScn(scn);
// LOGGER.info("Found previous offset {}", offsetContext);
//
// return offsetContext;
// }
//
// return null;
// }
@Override
public List<SourceRecord> poll() throws InterruptedException {
// TODO
List records = queue.poll();
List<SourceRecord> sourceRecords = ((List<DataChangeEvent>)records).stream()
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
if (!sourceRecords.isEmpty()) {
this.lastOffset = sourceRecords.get(sourceRecords.size() - 1).sourceOffset();
}
return sourceRecords;
}
@Override
public void commit() throws InterruptedException {
coordinator.commitOffset(lastOffset);
}
@Override
public void stop() {
cleanupResources();
}
private void cleanupResources() {
if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOGGER.info("Connector has already been stopped");
return;
}
try {
if (coordinator != null) {
coordinator.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping coordinator", e);
// XStream code can end in SIGSEGV so fail the task instead of JVM crash
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
try {
if (errorHandler != null) {
errorHandler.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping", e);
}
try {
if (jdbcConnection != null) {
jdbcConnection.close();
}
}
catch (SQLException e) {
LOGGER.error("Exception while closing JDBC connection", e);
}
schema.close();
}
@Override
protected Iterable<Field> getAllConfigurationFields() {
return SqlServerConnectorConfig.ALL_FIELDS;
}
}

View File

@ -0,0 +1,111 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
/**
* Logical representation of Sql Server schema.
*
* @author Jiri Pechanec
*
*/
public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class);
private final DatabaseHistory databaseHistory;
private final Set<TableId> capturedTables;
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, SqlServerConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
new TableSchemaBuilder(new SqlServerValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA),
false);
this.databaseHistory = connectorConfig.getDatabaseHistory();
this.databaseHistory.start();
try {
this.capturedTables = determineCapturedTables(connectorConfig, connection);
}
catch (SQLException e) {
throw new IllegalStateException("Could not obtain the list of captured tables", e);
}
}
private static Predicate<TableId> getTableFilter(SqlServerConnectorConfig connectorConfig) {
return t -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(t);
}
@Override
public void recover(OffsetContext offset) {
// databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), new OracleDdlParser());
// for (TableId tableId : tableIds()) {
// buildAndRegisterSchema(tableFor(tableId));
// }
}
@Override
public void close() {
databaseHistory.stop();
}
@Override
public void applySchemaChange(SchemaChangeEvent schemaChange) {
LOGGER.debug("Applying schema change event {}", schemaChange);
// just a single table per DDL event for Oracle
Table table = schemaChange.getTables().iterator().next();
buildAndRegisterSchema(table);
tables().overwriteTable(table);
TableChanges tableChanges = null;
if (schemaChange.getType() == SchemaChangeEventType.CREATE && schemaChange.isFromSnapshot()) {
tableChanges = new TableChanges();
tableChanges.create(table);
}
// databaseHistory.record(schemaChange.getPartition(), schemaChange.getOffset(), schemaChange.getDatabase(),
// schemaChange.getSchema(), schemaChange.getDdl(), tableChanges);
}
public Set<TableId> getCapturedTables() {
return capturedTables;
}
private Set<TableId> determineCapturedTables(SqlServerConnectorConfig connectorConfig, SqlServerConnection connection) throws SQLException {
final Set<TableId> allTableIds = connection.readTableNames(connectorConfig.getDatabaseName(), null, null, new String[] {"TABLE"} );
final Set<TableId> capturedTables = new HashSet<>();
for (TableId tableId : allTableIds) {
if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
capturedTables.add(tableId);
}
else {
LOGGER.trace("Skipping table {} as it's not included in the filter configuration", tableId);
}
}
return capturedTables;
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.util.Collect;
public class SqlServerOffsetContext implements OffsetContext {
private static final String SERVER_PARTITION_KEY = "server";
private static final String QUERY_FROM_LSN_KEY = "query_from_lsn";
private static final String QUERY_TO_LSN_KEY = "query_to_lsn";
private static final String QUERY_TABLE = "query_table";
private final Schema sourceInfoSchema;
private final SourceInfo sourceInfo;
private final Map<String, String> partition;
private Lsn queryFromLsn;
private Lsn queryToLsn;
private TableId queryTable;
public SqlServerOffsetContext(String serverName) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName);
sourceInfo = new SourceInfo(serverName);
sourceInfoSchema = sourceInfo.schema();
}
@Override
public Map<String, ?> getPartition() {
return partition;
}
@Override
public Map<String, ?> getOffset() {
return Collect.hashMapOf(
QUERY_FROM_LSN_KEY, queryFromLsn == null ? null : queryFromLsn.toString(),
QUERY_TO_LSN_KEY, queryToLsn == null ? null : queryToLsn.toString(),
QUERY_TABLE, queryTable == null ? null : queryTable.toString(),
SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString()
);
}
@Override
public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
public void setChangeLsn(Lsn lsn) {
sourceInfo.setChangeLsn(lsn);
}
public void setSourceTime(Instant instant) {
sourceInfo.setSourceTime(instant);
}
public void setQueryFromLsn(Lsn queryFromLsn) {
this.queryFromLsn = queryFromLsn;
}
public void setQueryToLsn(Lsn queryToLsn) {
this.queryToLsn = queryToLsn;
}
public void setQueryTable(TableId queryTable) {
this.queryTable = queryTable;
}
@Override
public boolean isSnapshotRunning() {
// TODO Auto-generated method stub
return false;
}
@Override
public void preSnapshotStart() {
// TODO Auto-generated method stub
}
@Override
public void preSnapshotCompletion() {
// TODO Auto-generated method stub
}
@Override
public void postSnapshotCompletion() {
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
/**
* {@link SchemaChangeEventEmitter} implementation based on SqlServer.
*
* @author Jiri Pechanec
*/
public class SqlServerSchemaChangeEventEmitter implements SchemaChangeEventEmitter {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSchemaChangeEventEmitter.class);
private final SqlServerOffsetContext offsetContext;
private final TableId tableId;
public SqlServerSchemaChangeEventEmitter(SqlServerOffsetContext offsetContext, TableId tableId) {
this.offsetContext = offsetContext;
this.tableId = tableId;
}
@Override
public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException {
// SchemaChangeEventType eventType = getSchemaChangeEventType();
// if (eventType == null) {
// return;
// }
//
// Tables tables = new Tables();
//
// SqlServerDdlParser parser = new SqlServerDdlParser();
// parser.setCurrentDatabase(ddlLcr.getSourceDatabaseName());
// parser.setCurrentSchema(ddlLcr.getObjectOwner());
// parser.parse(ddlLcr.getDDLText(), tables);
//
// Set<TableId> changedTableIds = tables.drainChanges();
// if (changedTableIds.isEmpty()) {
// throw new IllegalArgumentException("Couldn't parse DDL statement " + ddlLcr.getDDLText());
// }
//
// Table table = tables.forTable(tableId);
//
// receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), ddlLcr.getDDLText(), table, eventType, false));
}
private SchemaChangeEventType getSchemaChangeEventType() {
// switch(ddlLcr.getCommandType()) {
// case "CREATE TABLE": return SchemaChangeEventType.CREATE;
// case "ALTER TABLE": LOGGER.warn("ALTER TABLE not yet implemented");
// case "DROP TABLE": LOGGER.warn("DROP TABLE not yet implemented");
// default:
// LOGGER.debug("Ignoring DDL event of type {}", ddlLcr.getCommandType());
// return null;
// }
return null;
}
}

View File

@ -0,0 +1,182 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
public class SqlServerSnapshotChangeEventSource implements SnapshotChangeEventSource {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotChangeEventSource.class);
private final SqlServerConnectorConfig connectorConfig;
private final SqlServerOffsetContext previousOffset;
private final SqlServerConnection jdbcConnection;
private final SqlServerDatabaseSchema schema;
public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection, SqlServerDatabaseSchema schema) {
this.connectorConfig = connectorConfig;
this.previousOffset = previousOffset;
this.jdbcConnection = jdbcConnection;
this.schema = schema;
}
@Override
public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {
// for now, just simple schema snapshotting is supported which just needs to be done once
if (previousOffset != null) {
LOGGER.debug("Found previous offset, skipping snapshotting");
return SnapshotResult.completed(previousOffset);
}
Connection connection = null;
SnapshotContext ctx = null;
try {
connection = jdbcConnection.connection();
connection.setAutoCommit(false);
ctx = new SnapshotContext(
context,
connection,
connectorConfig.getDatabaseName()
);
ctx.capturedTables = schema.getCapturedTables();
if (!lockDatabase(ctx)) {
return SnapshotResult.aborted();
}
determineOffsetContextWithLsn(ctx);
readTableStructure(ctx);
if (!createSchemaChangeEventsForTables(ctx)) {
return SnapshotResult.aborted();
}
return SnapshotResult.completed(ctx.offset);
}
catch(RuntimeException e) {
throw e;
}
catch(Exception e) {
throw new RuntimeException(e);
}
finally {
if (ctx != null) {
ctx.dispose();
}
rollbackTransaction(connection);
}
}
private boolean lockDatabase(SnapshotContext ctx) throws SQLException {
// TODO use SET SINGLE
return true;
}
private void determineOffsetContextWithLsn(SnapshotContext ctx) throws SQLException {
ctx.offset = new SqlServerOffsetContext(connectorConfig.getLogicalName());
final Lsn lsn = jdbcConnection.getMaxLsn();
}
private void readTableStructure(SnapshotContext ctx) throws SQLException {
ctx.tables = new Tables();
Set<String> schemas = ctx.capturedTables.stream()
.map(TableId::schema)
.collect(Collectors.toSet());
// reading info only for the schemas we're interested in as per the set of captured tables;
// while the passed table name filter alone would skip all non-included tables, reading the schema
// would take much longer that way
for (String schema : schemas) {
jdbcConnection.readSchema(
ctx.tables,
ctx.catalogName,
schema,
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false
);
}
}
private boolean createSchemaChangeEventsForTables(SnapshotContext ctx) throws SQLException {
for (TableId tableId : ctx.capturedTables) {
if (!ctx.changeEventSourceContext.isRunning()) {
return false;
}
LOGGER.debug("Capturing structure of table {}", tableId);
Table table = ctx.tables.forTable(tableId);
// TODO - use sp_help and sp_columns to build CREATE TABLE
final String ddl = "";
schema.applySchemaChange(new SchemaChangeEvent(ctx.offset.getPartition(), ctx.offset.getOffset(), ctx.catalogName,
tableId.schema(), ddl, table, SchemaChangeEventType.CREATE, true));
}
return true;
}
private void rollbackTransaction(Connection connection) {
if(connection != null) {
try {
connection.rollback();
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
/**
* Mutable context which is populated in the course of snapshotting.
*/
private static class SnapshotContext {
public final ChangeEventSourceContext changeEventSourceContext;
public final Statement statement;
public final String catalogName;
public Set<TableId> capturedTables;
public SqlServerOffsetContext offset;
public Tables tables;
public SnapshotContext(ChangeEventSourceContext changeEventSourceContext, Connection connection, String catalogName) throws SQLException {
this.changeEventSourceContext = changeEventSourceContext;
this.statement = connection.createStatement();
this.catalogName = catalogName;
}
public void dispose() {
try {
statement.close();
}
catch (SQLException e) {
LOGGER.error("Couldn't close statement", e);
}
}
}
}

View File

@ -0,0 +1,202 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
/**
* A {@link StreamingChangeEventSource} based on SQL Server change data capture functionality.
* A main polls database DDL change and change data tables and turns them into change events.
*
* @author Jiri Pechanec
*/
public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource {
private static final int COL_COMMIT_LSN = 1;
private static final int COL_ROW_LSN = 2;
private static final int COL_OPERATION = 3;
private static final int COL_DATA = 5;
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
private final SqlServerConnection connection;
private final EventDispatcher<TableId> dispatcher;
private final ErrorHandler errorHandler;
private final Clock clock;
private final SqlServerDatabaseSchema schema;
private final SqlServerOffsetContext offsetContext;
private final Duration pollInterval;
public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext offsetContext, SqlServerConnection connection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema schema) {
this.connection = connection;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.clock = clock;
this.schema = schema;
this.offsetContext = offsetContext;
this.pollInterval = Duration.ofSeconds(1);
}
@Override
public void execute(ChangeEventSourceContext context) throws InterruptedException {
try {
final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]);
Lsn lastProcessedLsn = new Lsn(null);
while (context.isRunning()) {
final Lsn currentMaxLsn = connection.getMaxLsn();
// Probably cannot happen but it is better to guard against such
// situation
if (!currentMaxLsn.isAvailable()) {
Thread.sleep(pollInterval.toMillis());
continue;
}
// There is no change in the database
if (currentMaxLsn.equals(lastProcessedLsn)) {
Thread.sleep(pollInterval.toMillis());
continue;
}
// Reading interval is inclusive so we need to move LSN forward
final Lsn fromLsn = lastProcessedLsn.isAvailable() ? connection.incrementLsn(lastProcessedLsn)
: lastProcessedLsn;
connection.getChangesForTables(tables, fromLsn, currentMaxLsn, resultSets -> {
final int tableCount = resultSets.length;
final ChangeTable[] changeTables = new ChangeTable[tableCount];
for (int i = 0; i < tableCount; i++) {
changeTables[i] = new ChangeTable(tables[i], resultSets[i]);
changeTables[i].next();
}
for (;;) {
ChangeTable tableSmallestLsn = null;
for (int i = 0; i < tableCount; i++) {
final ChangeTable changeTable = changeTables[i];
if (changeTable.isCompleted()) {
continue;
}
if (tableSmallestLsn == null || changeTable.compareTo(tableSmallestLsn) < 0) {
tableSmallestLsn = changeTable;
}
}
if (tableSmallestLsn == null) {
// No more LSNs available
break;
}
final TableId tableId = tableSmallestLsn.getTableId();
final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
final Lsn rowLsn = tableSmallestLsn.getRowLsn();
final int operation = tableSmallestLsn.getOperation();
final Object[] data = tableSmallestLsn.getData();
// UPDATE consists of two consecutive events, first event contains
// the row before it was updated and the second the row after
// it was updated
if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) {
if (!tableSmallestLsn.next() || tableSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) {
throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
}
}
final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null;
offsetContext.setChangeLsn(rowLsn);
offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn));
offsetContext.setQueryFromLsn(fromLsn);
offsetContext.setQueryToLsn(currentMaxLsn);
offsetContext.setQueryTable(tableId);
try {
dispatcher
.dispatchDataChangeEvent(
tableId, new SqlServerChangeRecordEmitter(offsetContext, operation,
data, dataNext, schema.tableFor(tableId), clock));
}
catch (InterruptedException e) {
break;
}
tableSmallestLsn.next();
}
});
lastProcessedLsn = currentMaxLsn;
}
}
catch (Exception e) {
throw new ConnectException(e);
}
}
@Override
public void commitOffset(Map<String, ?> offset) {
}
private static class ChangeTable {
private final TableId tableId;
private final ResultSet resultSet;
private boolean completed = false;
public ChangeTable(TableId tableId, ResultSet resultSet) {
super();
this.tableId = tableId;
this.resultSet = resultSet;
}
public TableId getTableId() {
return tableId;
}
public Lsn getCommitLsn() throws SQLException {
return new Lsn(resultSet.getBytes(COL_COMMIT_LSN));
}
public Lsn getRowLsn() throws SQLException {
return new Lsn(resultSet.getBytes(COL_ROW_LSN));
}
public int getOperation() throws SQLException {
return resultSet.getInt(COL_OPERATION);
}
public Object[] getData() throws SQLException {
final int dataColumnCount = resultSet.getMetaData().getColumnCount() - (COL_DATA - 1);
final Object[] data = new Object[dataColumnCount];
for (int i = 0; i < dataColumnCount; i++) {
data[i] = resultSet.getObject(COL_DATA + i);
}
return data;
}
public boolean next() throws SQLException {
completed = !resultSet.next();
return !completed;
}
public boolean isCompleted() {
return completed;
}
public int compareTo(ChangeTable o) throws SQLException {
return getRowLsn().compareTo(o.getRowLsn());
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.connector.common.CdcSourceTaskContext;
/**
* A state (context) associated with a SQL Server task
*
* @author Jiri Pechanec
*
*/
public class SqlServerTaskContext extends CdcSourceTaskContext {
public SqlServerTaskContext(SqlServerConnectorConfig config) {
super("Oracle", config.getLogicalName());
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/**
* The topic naming strategy based on connector configuration and table name
*
* @author Jiri Pechanec
*
*/
public class SqlServerTopicSelector implements TopicSelector<TableId> {
private final String prefix;
public SqlServerTopicSelector(String prefix) {
this.prefix = prefix;
}
public static SqlServerTopicSelector defaultSelector(String prefix) {
return new SqlServerTopicSelector(prefix);
}
@Override
public String topicNameFor(TableId tableId) {
return String.join(".", prefix, tableId.schema(), tableId.table());
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.ZonedTimestamp;
import microsoft.sql.DateTimeOffset;
/**
* Conversion of SQL Server specific datatypes.
*
* @author Jiri Pechanec
*
*/
public class SqlServerValueConverters extends JdbcValueConverters {
public SqlServerValueConverters() {
}
@Override
public SchemaBuilder schemaBuilder(Column column) {
switch (column.jdbcType()) {
// Numeric integers
case Types.TINYINT:
// values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int
return SchemaBuilder.int16();
// Floating point
case microsoft.sql.Types.SMALLMONEY:
case microsoft.sql.Types.MONEY:
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().get());
case microsoft.sql.Types.DATETIMEOFFSET:
return ZonedTimestamp.builder();
default:
return super.schemaBuilder(column);
}
}
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
switch (column.jdbcType()) {
// Numeric integers
case Types.TINYINT:
// values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int
return (data) -> convertSmallInt(column, fieldDefn, data);
// Floating point
case microsoft.sql.Types.SMALLMONEY:
case microsoft.sql.Types.MONEY:
return (data) -> convertDecimal(column, fieldDefn, data);
case microsoft.sql.Types.DATETIMEOFFSET:
return (data) -> convertTimestampWithZone(column, fieldDefn, data);
// TODO Geometry and geography supported since 6.5.0
default:
return super.converter(column, fieldDefn);
}
}
/**
* Time precision in SQL Server is defined in scale, the default one is 7
*/
@Override
protected int getTimePrecision(Column column) {
return column.scale().get();
}
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
if (!(data instanceof DateTimeOffset)) {
return super.convertTimestampWithZone(column, fieldDefn, data);
}
final DateTimeOffset dto = (DateTimeOffset)data;
// Timestamp is provided in UTC time
final Timestamp utc = dto.getTimestamp();
final ZoneOffset offset = ZoneOffset.ofTotalSeconds(dto.getMinutesOffset() * 60);
return super.convertTimestampWithZone(column, fieldDefn, LocalDateTime.ofEpochSecond(utc.getTime() / 1000, utc.getNanos(), offset).atOffset(offset));
}
}

View File

@ -0,0 +1 @@
version=${project.version}

View File

@ -1,4 +1,7 @@
enable_cdc_for_db=IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=0)\n\
EXEC sys.sp_cdc_enable_db
disable_cdc_for_db=IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=1)\n\
EXEC sys.sp_cdc_disable_db
enable_cdc_for_table=IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n\
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0
get_max_lsn=SELECT sys.fn_cdc_get_max_lsn()

View File

@ -0,0 +1,253 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.fest.assertions.Assertions.assertThat;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.time.Date;
import io.debezium.time.MicroTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
/**
* Integration test to verify different Oracle datatypes.
*
* @author Jiri Pechanec
*/
public abstract class AbstractSqlServerDatatypesTest extends AbstractConnectorTest {
/**
* Key for schema parameter used to store DECIMAL/NUMERIC columns' precision.
*/
static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision";
private static final String DDL_STRING = "create table type_string (" +
" id int not null, " +
" val_char char(3), " +
" val_varchar varchar(1000), " +
" val_text text, " +
" val_nchar nchar(3), " +
" val_nvarchar nvarchar(1000), " +
" val_ntext ntext, " +
" primary key (id)" +
")";
private static final String DDL_FP = "create table type_fp (" +
" id int not null, " +
" val_decimal decimal(6,3), " +
" val_numeric numeric, " +
" val_float float, " +
" val_real real, " +
" val_smallmoney smallmoney, " +
" val_money money " +
" primary key (id)" +
")";
private static final String DDL_INT = "create table type_int (" +
" id int not null, " +
" val_bit bit, " +
" val_tinyint tinyint, " +
" val_smallint smallint, " +
" val_int int, " +
" val_bigint bigint, " +
" primary key (id)" +
")";
private static final String DDL_TIME = "create table type_time (" +
" id int not null, " +
" val_date date, " +
" val_time time(4), " +
" val_datetime2 datetime2, " +
" val_datetimeoffset datetimeoffset, " +
" val_datetime datetime, " +
" val_smalldatetime smalldatetime, " +
" primary key (id)" +
")";
private static final String DDL_XML = "create table type_xml (" +
" id int not null, " +
" val_xml xml, " +
" primary key (id)" +
")";
private static final List<SchemaAndValueField> EXPECTED_INT = Arrays.asList(
new SchemaAndValueField("val_bit", Schema.OPTIONAL_BOOLEAN_SCHEMA, true),
new SchemaAndValueField("val_tinyint", Schema.OPTIONAL_INT16_SCHEMA, (short)22),
new SchemaAndValueField("val_smallint", Schema.OPTIONAL_INT16_SCHEMA, (short)333),
new SchemaAndValueField("val_int", Schema.OPTIONAL_INT32_SCHEMA, 4444),
new SchemaAndValueField("val_bigint", Schema.OPTIONAL_INT64_SCHEMA, 55555l)
);
private static final List<SchemaAndValueField> EXPECTED_FP = Arrays.asList(
new SchemaAndValueField("val_decimal",Decimal.builder(3).parameter(PRECISION_PARAMETER_KEY, "6").optional().build(), new BigDecimal("1.123")),
new SchemaAndValueField("val_numeric", Decimal.builder(0).parameter(PRECISION_PARAMETER_KEY, "18").optional().build(), new BigDecimal("2")),
new SchemaAndValueField("val_float", Schema.OPTIONAL_FLOAT64_SCHEMA, 3.323),
new SchemaAndValueField("val_real", Schema.OPTIONAL_FLOAT32_SCHEMA, 4.323f),
new SchemaAndValueField("val_smallmoney", Decimal.builder(4).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("5.3230")),
new SchemaAndValueField("val_money", Decimal.builder(4).parameter(PRECISION_PARAMETER_KEY, "19").optional().build(), new BigDecimal("6.3230"))
);
private static final List<SchemaAndValueField> EXPECTED_STRING = Arrays.asList(
new SchemaAndValueField("val_char", Schema.OPTIONAL_STRING_SCHEMA, "cc "),
new SchemaAndValueField("val_varchar", Schema.OPTIONAL_STRING_SCHEMA, "vcc"),
new SchemaAndValueField("val_text", Schema.OPTIONAL_STRING_SCHEMA, "tc"),
new SchemaAndValueField("val_nchar", Schema.OPTIONAL_STRING_SCHEMA, "c\u010d "),
new SchemaAndValueField("val_nvarchar", Schema.OPTIONAL_STRING_SCHEMA, "vc\u010d"),
new SchemaAndValueField("val_ntext", Schema.OPTIONAL_STRING_SCHEMA, "t\u010d")
);
private static final List<SchemaAndValueField> EXPECTED_DATE_TIME = Arrays.asList(
new SchemaAndValueField("val_date", Date.builder().optional().build(), 17_725),
new SchemaAndValueField("val_time", MicroTime.builder().optional().build(), 37_425_000_000l),
new SchemaAndValueField("val_datetime2", NanoTimestamp.builder().optional().build(), 1_531_481_025_340_000_000l),
new SchemaAndValueField("val_datetimeoffset", ZonedTimestamp.builder().optional().build(), "2018-07-13T12:23:45.456+11:00"),
new SchemaAndValueField("val_datetime", Timestamp.builder().optional().build(), 1_531_488_225_780l),
new SchemaAndValueField("val_smalldatetime", Timestamp.builder().optional().build(), 1_531_491_840_000l)
);
private static final List<SchemaAndValueField> EXPECTED_XML = Arrays.asList(
new SchemaAndValueField("val_xml", Schema.OPTIONAL_STRING_SCHEMA, "<a>b</a>")
);
private static final String[] ALL_TABLES = {
"type_int",
"type_fp",
"type_string",
"type_time",
"type_xml"
};
private static final String[] ALL_DDLS = {
DDL_INT,
DDL_FP,
DDL_STRING,
DDL_TIME,
DDL_XML
};
private static final int EXPECTED_RECORD_COUNT = ALL_DDLS.length;
@AfterClass
public static void dropTables() throws SQLException {
TestHelper.dropTestDatabase();
}
@BeforeClass
public static void createTables() throws SQLException {
TestHelper.createTestDatabase();
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute(ALL_DDLS);
for (String table: ALL_TABLES) {
connection.enableTableCdc(table);
}
connection.execute(
"INSERT INTO type_int VALUES (0, 1, 22, 333, 4444, 55555)",
"INSERT INTO type_fp VALUES (0, 1.123, 2, 3.323, 4.323, 5.323, 6.323)",
"INSERT INTO type_string VALUES (0, 'c\u010d', 'vc\u010d', 't\u010d', N'c\u010d', N'vc\u010d', N't\u010d')",
"INSERT INTO type_time VALUES (0, '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 12:23:45.456+11:00', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45')",
"INSERT INTO type_xml VALUES (0, '<a>b</a>')"
);
}
}
@Test
public void intTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_int");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_INT);
}
@Test
public void fpTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_fp");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_FP);
}
@Test
public void stringTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_string");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_STRING);
}
@Test
public void dateTimeTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_time");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_DATE_TIME);
}
@Test
public void otherTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.dbo.type_xml");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_XML);
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import org.junit.Before;
import org.junit.BeforeClass;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.util.Testing;
/**
* Integration test to verify different Oracle datatypes.
*
* @author Jiri Pechanec
*/
public class SnapshotDatatypesIT extends AbstractSqlServerDatatypesTest {
@BeforeClass
public static void beforeClass() throws SQLException {
createTables();
}
@Before
public void before() throws Exception {
initializeConnectorTestFramework();
Testing.Debug.enable();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Configuration config = TestHelper.defaultConfig().build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
Thread.sleep(1000);
}
}

View File

@ -6,11 +6,13 @@
package io.debezium.connector.sqlserver;
import java.math.BigInteger;
import java.sql.SQLException;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.util.Testing;
/**
@ -22,19 +24,11 @@ public class SqlServerConnectionIT {
@BeforeClass
public static void beforeClass() throws SQLException {
// NOTE: you cannot enable CDC for the "master" db (the default one) so
// all tests must use a separate database...
try (SqlServerConnection connection = new SqlServerConnection(TestHelper.defaultJdbcConfig())) {
connection.connect();
String sql = "IF EXISTS(select 1 from sys.databases where name='testDB') DROP DATABASE testDB\n"
+ "CREATE DATABASE testDB\n";
connection.execute(sql);
}
}
@Test
public void shouldEnableCDCForDatabase() throws Exception {
try (SqlServerConnection connection = new SqlServerConnection(TestHelper.defaultJdbcConfig())) {
public void shouldEnableCdcForDatabase() throws Exception {
try (SqlServerConnection connection = TestHelper.adminConnection()) {
connection.connect();
connection.execute("USE testDB");
// NOTE: you cannot enable CDC on master
@ -43,8 +37,8 @@ public void shouldEnableCDCForDatabase() throws Exception {
}
@Test
public void shouldEnableCDCWithWrapperFunctionsForTable() throws Exception {
try (SqlServerConnection connection = new SqlServerConnection(TestHelper.defaultJdbcConfig())) {
public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception {
try (SqlServerConnection connection = TestHelper.adminConnection()) {
connection.connect();
connection.execute("USE testDB");
// NOTE: you cannot enable CDC on master
@ -71,7 +65,8 @@ public void shouldEnableCDCWithWrapperFunctionsForTable() throws Exception {
"select * from cdc.fn_cdc_get_all_changes_dbo_testTable(sys.fn_cdc_get_min_lsn('dbo_testTable'), sys.fn_cdc_get_max_lsn(), N'all')",
rs -> {
while (rs.next()) {
final StringBuilder sb = new StringBuilder();
final BigInteger lsn = new BigInteger(rs.getBytes(1));
final StringBuilder sb = new StringBuilder(lsn.toString());
for (int col = 1; col <= rs.getMetaData().getColumnCount(); col++) {
sb.append(rs.getObject(col)).append(' ');
}

View File

@ -0,0 +1,201 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.junit.Assert.assertNull;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
/**
* Integration test for the Debezium SQL Server connector.
*
* @author Jiri Pechanec
*/
public class SqlServerConnectorIT extends AbstractConnectorTest {
private static SqlServerConnection connection;
@BeforeClass
public static void beforeClass() {
}
@AfterClass
public static void closeConnection() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
connection.execute(
"CREATE TABLE tablea (id int primary key, cola varchar(30))",
"CREATE TABLE tableb (id int primary key, colb varchar(30))",
"INSERT INTO tablea VALUES(1, 'a')"
);
connection.enableTableCdc("tablea");
connection.enableTableCdc("tableb");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws SQLException {
connection.close();
TestHelper.dropTestDatabase();
}
@Test
public void createAndDelete() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig().build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')"
);
}
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordA = tableA.get(i);
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowA = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct keyA = (Struct)recordA.key();
final Struct valueA = (Struct)recordA.value();
assertRecord((Struct)valueA.get("after"), expectedRowA);
assertNull(valueA.get("before"));
final Struct keyB = (Struct)recordB.key();
final Struct valueB = (Struct)recordB.value();
assertRecord((Struct)valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
}
connection.execute("DELETE FROM tableB");
final SourceRecords deleteRecords = consumeRecordsByTopic(2 * RECORDS_PER_TABLE);
final List<SourceRecord> deleteTableA = deleteRecords.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> deleteTableB = deleteRecords.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(deleteTableA).isNullOrEmpty();
Assertions.assertThat(deleteTableB).hasSize(2 * RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord deleteRecord = deleteTableB.get(i * 2);
final SourceRecord tombstoneRecord = deleteTableB.get(i * 2 + 1);
final List<SchemaAndValueField> expectedDeleteRow = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct deleteKey = (Struct)deleteRecord.key();
final Struct deleteValue = (Struct)deleteRecord.value();
assertRecord((Struct)deleteValue.get("before"), expectedDeleteRow);
assertNull(deleteValue.get("after"));
final Struct tombstoneKey = (Struct)tombstoneRecord.key();
final Struct tombstoneValue = (Struct)tombstoneRecord.value();
assertNull(tombstoneValue);
}
stopConnector();
}
@Test
public void update() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig().build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
connection.setAutoCommit(false);
final String[] tableBInserts = new String[RECORDS_PER_TABLE];
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
tableBInserts[i] = "INSERT INTO tableb VALUES(" + id + ", 'b')";
}
connection.execute(tableBInserts);
connection.setAutoCommit(true);
connection.execute("UPDATE tableb SET colb='z'");
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2);
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE * 2);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct keyB = (Struct)recordB.key();
final Struct valueB = (Struct)recordB.value();
assertRecord((Struct)valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
}
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordB = tableB.get(i + RECORDS_PER_TABLE);
final List<SchemaAndValueField> expectedBefore = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final List<SchemaAndValueField> expectedAfter = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "z"));
final Struct keyB = (Struct)recordB.key();
final Struct valueB = (Struct)recordB.value();
assertRecord((Struct)valueB.get("before"), expectedBefore);
assertRecord((Struct)valueB.get("after"), expectedAfter);
}
stopConnector();
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
}

View File

@ -1,26 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConfiguration;
/**
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class TestHelper {
protected static JdbcConfiguration defaultJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDefault(JdbcConfiguration.DATABASE, "master")
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 1433)
.withDefault(JdbcConfiguration.USER, "sa")
.withDefault(JdbcConfiguration.PASSWORD, "Password!")
.build();
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver.util;
import java.nio.file.Path;
import java.sql.SQLException;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectionFactory;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class TestHelper {
public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
public static final String TEST_DATABASE = "testDB";
public static JdbcConfiguration adminJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDefault(JdbcConfiguration.DATABASE, "master")
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 1433)
.withDefault(JdbcConfiguration.USER, "sa")
.withDefault(JdbcConfiguration.PASSWORD, "Password!")
.build();
}
public static JdbcConfiguration defaultJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDefault(JdbcConfiguration.DATABASE, TEST_DATABASE)
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 1433)
.withDefault(JdbcConfiguration.USER, "sa")
.withDefault(JdbcConfiguration.PASSWORD, "Password!")
.build();
}
/**
* Returns a default configuration suitable for most test cases. Can be amended/overridden in individual tests as
* needed.
*/
public static Configuration.Builder defaultConfig() {
JdbcConfiguration jdbcConfiguration = defaultJdbcConfig();
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach(
(field, value) -> builder.with(SqlServerConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)
);
return builder.with(SqlServerConnectorConfig.LOGICAL_NAME, "server1")
.with(SqlServerConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
}
public static void createTestDatabase() {
// NOTE: you cannot enable CDC for the "master" db (the default one) so
// all tests must use a separate database...
try (SqlServerConnection connection = adminConnection()) {
connection.connect();
try {
connection.execute("USE testDB");
connection.disableDbCdc("testDB");
}
catch (SQLException e) {
}
connection.execute("USE master");
String sql = "IF EXISTS(select 1 from sys.databases where name='testDB') DROP DATABASE testDB\n"
+ "CREATE DATABASE testDB\n";
connection.execute(sql);
connection.execute("USE testDB");
// NOTE: you cannot enable CDC on master
connection.enableDbCdc("testDB");
}
catch (SQLException e) {
throw new IllegalStateException("Error while initating test database", e);
}
}
public static void dropTestDatabase() {
try (SqlServerConnection connection = adminConnection()) {
connection.connect();
try {
connection.execute("USE testDB");
connection.disableDbCdc("testDB");
}
catch (SQLException e) {
}
connection.execute("USE master");
String sql = "IF EXISTS(select 1 from sys.databases where name='testDB') DROP DATABASE testDB";
connection.execute(sql);
}
catch (SQLException e) {
throw new IllegalStateException("Error while initating test database", e);
}
}
public static SqlServerConnection adminConnection() {
return new SqlServerConnection(TestHelper.adminJdbcConfig(), new SqlServerConnectionFactory());
}
public static SqlServerConnection testConnection() {
return new SqlServerConnection(TestHelper.defaultJdbcConfig(), new SqlServerConnectionFactory());
}
}

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>0.8.0.Final</version>
<version>0.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>