diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java new file mode 100644 index 000000000..dc0324031 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java @@ -0,0 +1,106 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.util.Arrays; + +/** + * A logical representation of SQL Server LSN (log sequence number) position. + * + * @author Jiri Pechanec + * + */ +public class Lsn implements Comparable { + private final byte[] binary; + private int[] unsignedBinary; + + private String string; + + public Lsn(byte[] binary) { + this.binary = binary; + } + + public byte[] getBinary() { + return binary; + } + + public boolean isAvailable() { + return binary != null; + } + + private int[] getUnsignedBinary() { + if (unsignedBinary != null || binary == null) { + return unsignedBinary; + } + + unsignedBinary = new int[binary.length]; + for (int i = 0; i < binary.length; i++) { + unsignedBinary[i] = Byte.toUnsignedInt(binary[i]); + } + return unsignedBinary; + } + + public String toString() { + if (string != null) { + return string; + } + final StringBuilder sb = new StringBuilder(); + if (binary == null) { + return "NULL"; + } + final int[] unsigned = getUnsignedBinary(); + for (int i = 0; i < unsigned.length; i++) { + final String byteStr = Integer.toHexString(unsigned[i]); + if (byteStr.length() == 1) { + sb.append('0'); + } + sb.append(byteStr); + if (i == 3 || i == 7) { + sb.append(':'); + } + } + string = sb.toString(); + return string; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(binary); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Lsn other = (Lsn) obj; + if (!Arrays.equals(binary, other.binary)) + return false; + return true; + } + + @Override + public int compareTo(Lsn o) { + if (this == o) { + return 0; + } + final int[] thisU = getUnsignedBinary(); + final int[] thatU = o.getUnsignedBinary(); + for (int i = 0; i < thisU.length; i++) { + final int diff = thisU[i] - thatU[i]; + if (diff != 0) { + return diff; + } + } + return 0; + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java new file mode 100644 index 000000000..7ca01d218 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java @@ -0,0 +1,24 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.util.Properties; + +import io.debezium.util.IoUtil; + +/** + * Information about this module. + * + * @author Jiri Pechanec + */ +public final class Module { + + private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/connector/sqlserver/build.version"); + + public static String version() { + return INFO.getProperty("version"); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java new file mode 100644 index 000000000..544eababe --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java @@ -0,0 +1,66 @@ +package io.debezium.connector.sqlserver; + +import java.time.Instant; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import io.debezium.connector.AbstractSourceInfo; + +/** + * Coordinates from the database log to restart streaming from. Maps to {@code source} field in enevlope and + * to connector offsets. + * + * @author Jiri Pechanec + * + */ +public class SourceInfo extends AbstractSourceInfo { + + public static final String SERVER_NAME_KEY = "name"; + public static final String LOG_TIMESTAMP_KEY = "ts_ms"; + public static final String CHANGE_LSN_KEY = "change_lsn"; + public static final String SNAPSHOT_KEY = "snapshot"; + + public static final Schema SCHEMA = schemaBuilder() + .name("io.debezium.connector.sqlserver.Source") + .field(SERVER_NAME_KEY, Schema.STRING_SCHEMA) + .field(LOG_TIMESTAMP_KEY, Schema.OPTIONAL_INT64_SCHEMA) + .field(CHANGE_LSN_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA) + .build(); + + private final String serverName; + private Lsn changeLsn; + private Instant sourceTime; + + protected SourceInfo(String serverName) { + super(Module.version()); + this.serverName = serverName; + } + + public void setChangeLsn(Lsn lsn) { + changeLsn = lsn; + } + + public Lsn getChangeLsn() { + return changeLsn; + } + + public void setSourceTime(Instant instant) { + sourceTime = instant; + } + + @Override + protected Schema schema() { + return SCHEMA; + } + + @Override + public Struct struct() { + return super.struct() + .put(SERVER_NAME_KEY, serverName) + .put(LOG_TIMESTAMP_KEY, sourceTime == null ? null : sourceTime.toEpochMilli()) + .put(CHANGE_LSN_KEY, changeLsn.toString()) + .put(SNAPSHOT_KEY, false); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java new file mode 100644 index 000000000..e2f6f9070 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; +import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.util.Clock; + +public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory { + + private final SqlServerConnectorConfig configuration; + private final SqlServerConnection jdbcConnection; + private final ErrorHandler errorHandler; + private final EventDispatcher dispatcher; + private final Clock clock; + private final SqlServerDatabaseSchema schema; + + public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration, SqlServerConnection jdbcConnection, + ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, SqlServerDatabaseSchema schema) { + this.configuration = configuration; + this.jdbcConnection = jdbcConnection; + this.errorHandler = errorHandler; + this.dispatcher = dispatcher; + this.clock = clock; + this.schema = schema; + } + + @Override + public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offsetContext) { + return new SqlServerSnapshotChangeEventSource(configuration, (SqlServerOffsetContext) offsetContext, jdbcConnection, schema); + } + + @Override + public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext offsetContext) { + return new SqlServerStreamingChangeEventSource( + configuration, + (SqlServerOffsetContext) offsetContext, + jdbcConnection, + dispatcher, + errorHandler, + clock, + schema + ); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeRecordEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeRecordEmitter.java new file mode 100644 index 000000000..644926e1f --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeRecordEmitter.java @@ -0,0 +1,74 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import io.debezium.data.Envelope.Operation; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.RelationalChangeRecordEmitter; +import io.debezium.relational.Table; +import io.debezium.util.Clock; + +/** + * Emits change data based on a a single CDC data row. + * + * @author Jiri Pechanec + */ +public class SqlServerChangeRecordEmitter extends RelationalChangeRecordEmitter { + public static final int OP_DELETE = 1; + public static final int OP_INSERT = 2; + public static final int OP_UPDATE_BEFORE = 3; + public static final int OP_UPDATE_AFTER = 4; + + private final int operation; + private final Object[] data; + private final Object[] dataNext; + + public SqlServerChangeRecordEmitter(OffsetContext offset, int operation, Object[] data, Object[] dataNext, Table table, Clock clock) { + super(offset, clock); + + this.operation = operation; + this.data = data; + this.dataNext = dataNext; + } + + @Override + protected Operation getOperation() { + if (operation == OP_DELETE) { + return Operation.DELETE; + } + else if (operation == OP_INSERT) { + return Operation.CREATE; + } + else if (operation == OP_UPDATE_BEFORE) { + return Operation.UPDATE; + } + throw new IllegalArgumentException("Received event of unexpected command type: " + operation); + } + + @Override + protected Object[] getOldColumnValues() { + switch (getOperation()) { + case CREATE: + case READ: + return null; + default: + return data; + } + } + + @Override + protected Object[] getNewColumnValues() { + switch (getOperation()) { + case CREATE: + case READ: + return data; + case UPDATE: + return dataNext; + default: + return null; + } + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index cee126e5c..48f8a6765 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -7,33 +7,35 @@ package io.debezium.connector.sqlserver; import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; import java.util.Objects; import java.util.Properties; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.config.Configuration; -import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; import io.debezium.util.IoUtil; /** * {@link JdbcConnection} extension to be used with Microsoft SQL Server * - * @author Horia Chiorean (hchiorea@redhat.com) + * @author Horia Chiorean (hchiorea@redhat.com), Jiri Pechanec + * */ public class SqlServerConnection extends JdbcConnection { - private static final String URL_PATTERN = "jdbc:sqlserver://${" + JdbcConfiguration.HOSTNAME + "}:${" - + JdbcConfiguration.PORT + "};databaseName=${" + JdbcConfiguration.DATABASE + "}"; - protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(URL_PATTERN, - com.microsoft.sqlserver.jdbc.SQLServerDriver.class.getName(), SqlServerConnection.class.getClassLoader()); private static Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class); private static final String STATEMENTS_PLACEHOLDER = "#"; private static final String ENABLE_DB_CDC; + private static final String DISABLE_DB_CDC; private static final String ENABLE_TABLE_CDC; private static final String CDC_WRAPPERS_DML; + private static final String GET_MAX_LSN; static { try { @@ -41,7 +43,9 @@ public class SqlServerConnection extends JdbcConnection { ClassLoader classLoader = SqlServerConnection.class.getClassLoader(); statements.load(classLoader.getResourceAsStream("statements.properties")); ENABLE_DB_CDC = statements.getProperty("enable_cdc_for_db"); + DISABLE_DB_CDC = statements.getProperty("disable_cdc_for_db"); ENABLE_TABLE_CDC = statements.getProperty("enable_cdc_for_table"); + GET_MAX_LSN = statements.getProperty("get_max_lsn"); CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql")); } catch (Exception e) { @@ -54,19 +58,10 @@ public class SqlServerConnection extends JdbcConnection { * * @param config * {@link Configuration} instance, may not be null. + * @param factory a factory building the connection string */ - public SqlServerConnection(Configuration config) { - super(config, FACTORY); - } - - /** - * Returns a JDBC connection string for the current configuration. - * - * @return a {@code String} where the variables in {@code urlPattern} are - * replaced with values from the configuration - */ - public String connectionString() { - return connectionString(URL_PATTERN); + public SqlServerConnection(Configuration config, ConnectionFactory factory) { + super(config, factory); } /** @@ -82,6 +77,19 @@ public void enableDbCdc(String name) throws SQLException { execute(ENABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name)); } + /** + * Disables CDC for a given database, if not already disabled. + * + * @param name + * the name of the DB, may not be {@code null} + * @throws SQLException + * if anything unexpected fails + */ + public void disableDbCdc(String name) throws SQLException { + Objects.requireNonNull(name); + execute(DISABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name)); + } + /** * Enables CDC for a table if not already enabled and generates the wrapper * functions for that table. @@ -96,4 +104,86 @@ public void enableTableCdc(String name) throws SQLException { String generateWrapperFunctionsStmts = CDC_WRAPPERS_DML.replaceAll(STATEMENTS_PLACEHOLDER, name); execute(enableCdcForTableStmt, generateWrapperFunctionsStmts); } + + /** + * @return the current largest log sequence number + */ + public Lsn getMaxLsn() throws SQLException { + final String LSN_COUNT_ERROR = "Maximum LSN query must return exactly one value"; + return queryAndMap(GET_MAX_LSN, rs -> { + if (rs.next()) { + final Lsn ret = new Lsn(rs.getBytes(1)); + if (!rs.next()) { + return ret; + } + } + throw new IllegalStateException(LSN_COUNT_ERROR); + }); + } + + public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException { + final String cdcNameForTable = cdcNameForTable(tableId); + final String query = "SELECT * FROM cdc.fn_cdc_get_all_changes_" + cdcNameForTable + "(ISNULL(?,sys.fn_cdc_get_min_lsn('" + cdcNameForTable + "')), ?, N'all update old')"; + prepareQuery(query, statement -> { + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, toLsn.getBinary()); + }, consumer); + } + + public void getChangesForTables(TableId[] tableIds, Lsn fromLsn, Lsn toLsn, MultiResultSetConsumer consumer) throws SQLException { + final String[] queries = new String[tableIds.length]; + + int idx = 0; + for (TableId tableId: tableIds) { + final String cdcNameForTable = cdcNameForTable(tableId); + final String query = "SELECT * FROM cdc.fn_cdc_get_all_changes_" + cdcNameForTable + "(ISNULL(?,sys.fn_cdc_get_min_lsn('" + cdcNameForTable + "')), ?, N'all update old')"; + queries[idx++] = query; + } + prepareQuery(queries, statement -> { + statement.setBytes(1, fromLsn.getBinary()); + statement.setBytes(2, toLsn.getBinary()); + }, consumer); + } + + public Lsn incrementLsn(Lsn lsn) throws SQLException { + final String LSN_INCREMENT_ERROR = "Increment LSN query must return exactly one value"; + final String query = "SELECT sys.fn_cdc_increment_lsn(?)"; + return prepareQueryAndMap(query, statement -> { + statement.setBytes(1, lsn.getBinary()); + }, rs -> { + if (rs.next()) { + final Lsn ret = new Lsn(rs.getBytes(1)); + if (!rs.next()) { + return ret; + } + } + throw new IllegalStateException(LSN_INCREMENT_ERROR); + }); + } + + public Instant timestampOfLsn(Lsn lsn) throws SQLException { + final String LSN_TIMESTAMP_ERROR = "LSN to timestamp query must return exactly one value"; + final String query = "SELECT sys.fn_cdc_map_lsn_to_time(?)"; + + if (lsn.getBinary() == null) { + return null; + } + + return prepareQueryAndMap(query, statement -> { + statement.setBytes(1, lsn.getBinary()); + }, rs -> { + if (rs.next()) { + final Timestamp ts = rs.getTimestamp(1); + final Instant ret = ts == null ? null : ts.toInstant(); + if (!rs.next()) { + return ret; + } + } + throw new IllegalStateException(LSN_TIMESTAMP_ERROR); + }); + } + + private String cdcNameForTable(TableId tableId) { + return tableId.schema() + '_' + tableId.table(); + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectionFactory.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectionFactory.java new file mode 100644 index 000000000..a80e60684 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectionFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection.ConnectionFactory; + +public class SqlServerConnectionFactory implements ConnectionFactory { + + @Override + public Connection connect(JdbcConfiguration config) throws SQLException { + String hostName = config.getHostname(); + int port = config.getPort(); + String database = config.getDatabase(); + String user = config.getUser(); + String password = config.getPassword(); + + return DriverManager.getConnection( + "jdbc:sqlserver://" + hostName + ":" + port + ";databaseName=" + database, user, password + ); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java new file mode 100644 index 000000000..bcf3c99fc --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +/** + * The main connector class used to instantiate configuration and execution classes + * + * @author Jiri Pechanec + * + */ +public class SqlServerConnector extends SourceConnector { + + private Map properties; + + @Override + public String version() { + return Module.version(); + } + + @Override + public void start(Map props) { + this.properties = Collections.unmodifiableMap(new HashMap<>(props)); + } + + @Override + public Class taskClass() { + return SqlServerConnectorTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + if (maxTasks > 1) { + throw new IllegalArgumentException("Only a single connector task may be started"); + } + + return Collections.singletonList(properties); + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return SqlServerConnectorConfig.configDef(); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java new file mode 100644 index 000000000..a740edc26 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -0,0 +1,151 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.connect.errors.ConnectException; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.Document; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.relational.RelationalDatabaseConnectorConfig; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables.TableFilter; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.KafkaDatabaseHistory; + +/** + * The list of configuration options for SQL Server connector + * + * @author Jiri Pechanec + * + */ +public class SqlServerConnectorConfig extends RelationalDatabaseConnectorConfig { + + // TODO pull up to RelationalConnectorConfig + public static final String DATABASE_CONFIG_PREFIX = "database."; + + public static final Field LOGICAL_NAME = Field.create("database.server.name") + .withDisplayName("Namespace") + .withType(Type.STRING) + .withWidth(Width.MEDIUM) + .withImportance(Importance.HIGH) + .withValidation(Field::isRequired) + // TODO + //.withValidation(Field::isRequired, MySqlConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName) + .withDescription("Unique name that identifies the database server and all recorded offsets, and" + + "that is used as a prefix for all schemas and topics. " + + "Each distinct MySQL installation should have a separate namespace and monitored by " + + "at most one Debezium connector."); + + public static final Field DATABASE_NAME = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE) + .withDisplayName("Database name") + .withType(Type.STRING) + .withWidth(Width.MEDIUM) + .withImportance(Importance.HIGH) + .withValidation(Field::isRequired) + .withDescription("The name of the database the connector should be monitoring. When working with a " + + "multi-tenant set-up, must be set to the CDB name."); + + /** + * The database history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, + * and in these situations using Kafka is the only way to go. + */ + public static final Field DATABASE_HISTORY = Field.create("database.history") + .withDisplayName("Database history class") + .withType(Type.CLASS) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withInvisibleRecommender() + .withDescription("The name of the DatabaseHistory class that should be used to store and recover database schema changes. " + + "The configuration properties for the history are prefixed with the '" + + DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.") + .withDefault(KafkaDatabaseHistory.class.getName()); + + /** + * The set of {@link Field}s defined as part of this configuration. + */ + public static Field.Set ALL_FIELDS = Field.setOf( + LOGICAL_NAME, + DATABASE_NAME, + RelationalDatabaseConnectorConfig.TABLE_WHITELIST, + RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, + RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN, + CommonConnectorConfig.POLL_INTERVAL_MS, + CommonConnectorConfig.MAX_BATCH_SIZE, + CommonConnectorConfig.MAX_QUEUE_SIZE + ); + + private final String databaseName; + + public SqlServerConnectorConfig(Configuration config) { + super(config, config.getString(LOGICAL_NAME), new SystemTablesPredicate()); + + this.databaseName = config.getString(DATABASE_NAME); + } + + public static ConfigDef configDef() { + ConfigDef config = new ConfigDef(); + + Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME); + Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST, + RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, + RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN + ); + Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE); + + return config; + } + + public String getDatabaseName() { + return databaseName; + } + + /** + * Returns a configured (but not yet started) instance of the database history. + */ + public DatabaseHistory getDatabaseHistory() { + Configuration config = getConfig(); + + DatabaseHistory databaseHistory = config.getInstance(SqlServerConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class); + if (databaseHistory == null) { + throw new ConnectException("Unable to instantiate the database history class " + + config.getString(SqlServerConnectorConfig.DATABASE_HISTORY)); + } + + // Do not remove the prefix from the subset of config properties ... + Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false) + .edit() + .withDefault(DatabaseHistory.NAME, getLogicalName() + "-dbhistory") + .build(); + + HistoryRecordComparator historyComparator = new HistoryRecordComparator() { + @Override + protected boolean isPositionAtOrBefore(Document recorded, Document desired) { + return (recorded.getLong("scn")).compareTo(desired.getLong("scn")) < 1; + } + }; + databaseHistory.configure(dbHistoryConfig, historyComparator); // validates + + return databaseHistory; + } + + private static class SystemTablesPredicate implements TableFilter { + + @Override + public boolean isIncluded(TableId t) { + return !(t.schema().toLowerCase().equals("cdc") || + t.schema().toLowerCase().equals("sys") || + t.table().toLowerCase().equals("systranschemas")); + } + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java new file mode 100644 index 000000000..2cd003a50 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -0,0 +1,200 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.common.BaseSourceTask; +import io.debezium.pipeline.ChangeEventSourceCoordinator; +import io.debezium.pipeline.DataChangeEvent; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.SchemaNameAdjuster; + +/** + * The main task executing streaming from SQL Server. + * Responsible for lifecycle management the streaming code. + * + * @author Jiri Pechanec + * + */ +public class SqlServerConnectorTask extends BaseSourceTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnectorTask.class); + private static final String CONTEXT_NAME = "sql-server-connector-task"; + + private static enum State { + RUNNING, STOPPED; + } + + private final AtomicReference state = new AtomicReference(State.STOPPED); + + private volatile SqlServerTaskContext taskContext; + private volatile ChangeEventQueue queue; + private volatile SqlServerConnection jdbcConnection; + private volatile ChangeEventSourceCoordinator coordinator; + private volatile ErrorHandler errorHandler; + private volatile SqlServerDatabaseSchema schema; + private volatile Map lastOffset; + + @Override + public String version() { + return Module.version(); + } + + @Override + public void start(Configuration config) { + if (!state.compareAndSet(State.STOPPED, State.RUNNING)) { + LOGGER.info("Connector has already been started"); + return; + } + + final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config); + taskContext = new SqlServerTaskContext(connectorConfig); + + final Clock clock = Clock.system(); + + // Set up the task record queue ... + this.queue = new ChangeEventQueue.Builder() + .pollInterval(connectorConfig.getPollInterval()) + .maxBatchSize(connectorConfig.getMaxBatchSize()) + .maxQueueSize(connectorConfig.getMaxQueueSize()) + .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME)) + .build(); + + errorHandler = new ErrorHandler(SqlServerConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources); + final SqlServerTopicSelector topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig.getLogicalName()); + + final Configuration jdbcConfig = config.subset("database.", true); + + jdbcConnection = new SqlServerConnection(jdbcConfig, new SqlServerConnectionFactory()); + final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); + + this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); + + final SqlServerOffsetContext previousOffset = null; +// OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig); +// if (previousOffset != null) { +// schema.recover(previousOffset); +// } + + final EventDispatcher dispatcher = new EventDispatcher<>(topicSelector, schema, queue, + connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new); + + coordinator = new ChangeEventSourceCoordinator( + previousOffset, + errorHandler, + SqlServerConnector.class, + connectorConfig.getLogicalName(), + new SqlServerChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema) + ); + + coordinator.start(); + } + +// private OracleOffsetContext getPreviousOffset(SqlServerConnectorConfig connectorConfig) { +// OracleOffsetContext offsetContext = new OracleOffsetContext(connectorConfig.getLogicalName()); +// +// Map previousOffset = context.offsetStorageReader() +// .offsets(Collections.singleton(offsetContext.getPartition())) +// .get(offsetContext.getPartition()); +// +// if (previousOffset != null) { +// long scn = (long) previousOffset.get(SourceInfo.SCN_KEY); +// offsetContext.setScn(scn); +// LOGGER.info("Found previous offset {}", offsetContext); +// +// return offsetContext; +// } +// +// return null; +// } + + @Override + public List poll() throws InterruptedException { + // TODO + List records = queue.poll(); + + List sourceRecords = ((List)records).stream() + .map(DataChangeEvent::getRecord) + .collect(Collectors.toList()); + + if (!sourceRecords.isEmpty()) { + this.lastOffset = sourceRecords.get(sourceRecords.size() - 1).sourceOffset(); + } + + return sourceRecords; + } + + @Override + public void commit() throws InterruptedException { + coordinator.commitOffset(lastOffset); + } + + @Override + public void stop() { + cleanupResources(); + } + + private void cleanupResources() { + if (!state.compareAndSet(State.RUNNING, State.STOPPED)) { + LOGGER.info("Connector has already been stopped"); + return; + } + + try { + if (coordinator != null) { + coordinator.stop(); + } + } + catch (InterruptedException e) { + Thread.interrupted(); + LOGGER.error("Interrupted while stopping coordinator", e); + // XStream code can end in SIGSEGV so fail the task instead of JVM crash + throw new ConnectException("Interrupted while stopping coordinator, failing the task"); + } + + try { + if (errorHandler != null) { + errorHandler.stop(); + } + } + catch (InterruptedException e) { + Thread.interrupted(); + LOGGER.error("Interrupted while stopping", e); + } + + try { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + } + catch (SQLException e) { + LOGGER.error("Exception while closing JDBC connection", e); + } + + schema.close(); + } + + @Override + protected Iterable getAllConfigurationFields() { + return SqlServerConnectorConfig.ALL_FIELDS; + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java new file mode 100644 index 000000000..8844f6b53 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java @@ -0,0 +1,111 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.HistorizedRelationalDatabaseSchema; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.TableSchemaBuilder; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.history.TableChanges; +import io.debezium.schema.SchemaChangeEvent; +import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; +import io.debezium.schema.TopicSelector; +import io.debezium.util.SchemaNameAdjuster; + +/** + * Logical representation of Sql Server schema. + * + * @author Jiri Pechanec + * + */ +public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema { + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class); + + private final DatabaseHistory databaseHistory; + private final Set capturedTables; + + public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, SqlServerConnection connection) { + super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, + new TableSchemaBuilder(new SqlServerValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA), + false); + this.databaseHistory = connectorConfig.getDatabaseHistory(); + this.databaseHistory.start(); + try { + this.capturedTables = determineCapturedTables(connectorConfig, connection); + } + catch (SQLException e) { + throw new IllegalStateException("Could not obtain the list of captured tables", e); + } + } + + private static Predicate getTableFilter(SqlServerConnectorConfig connectorConfig) { + return t -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(t); + } + + @Override + public void recover(OffsetContext offset) { +// databaseHistory.recover(offset.getPartition(), offset.getOffset(), tables(), new OracleDdlParser()); +// for (TableId tableId : tableIds()) { +// buildAndRegisterSchema(tableFor(tableId)); +// } + } + + @Override + public void close() { + databaseHistory.stop(); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChange) { + LOGGER.debug("Applying schema change event {}", schemaChange); + + // just a single table per DDL event for Oracle + Table table = schemaChange.getTables().iterator().next(); + buildAndRegisterSchema(table); + tables().overwriteTable(table); + + TableChanges tableChanges = null; + if (schemaChange.getType() == SchemaChangeEventType.CREATE && schemaChange.isFromSnapshot()) { + tableChanges = new TableChanges(); + tableChanges.create(table); + } + +// databaseHistory.record(schemaChange.getPartition(), schemaChange.getOffset(), schemaChange.getDatabase(), +// schemaChange.getSchema(), schemaChange.getDdl(), tableChanges); + } + + public Set getCapturedTables() { + return capturedTables; + } + + private Set determineCapturedTables(SqlServerConnectorConfig connectorConfig, SqlServerConnection connection) throws SQLException { + final Set allTableIds = connection.readTableNames(connectorConfig.getDatabaseName(), null, null, new String[] {"TABLE"} ); + + final Set capturedTables = new HashSet<>(); + + for (TableId tableId : allTableIds) { + if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) { + capturedTables.add(tableId); + } + else { + LOGGER.trace("Skipping table {} as it's not included in the filter configuration", tableId); + } + } + + return capturedTables; + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java new file mode 100644 index 000000000..afb584e0e --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -0,0 +1,105 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.time.Instant; +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.TableId; +import io.debezium.util.Collect; + +public class SqlServerOffsetContext implements OffsetContext { + + private static final String SERVER_PARTITION_KEY = "server"; + private static final String QUERY_FROM_LSN_KEY = "query_from_lsn"; + private static final String QUERY_TO_LSN_KEY = "query_to_lsn"; + private static final String QUERY_TABLE = "query_table"; + + private final Schema sourceInfoSchema; + private final SourceInfo sourceInfo; + private final Map partition; + + private Lsn queryFromLsn; + private Lsn queryToLsn; + private TableId queryTable; + + public SqlServerOffsetContext(String serverName) { + partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName); + sourceInfo = new SourceInfo(serverName); + sourceInfoSchema = sourceInfo.schema(); + } + + @Override + public Map getPartition() { + return partition; + } + + @Override + public Map getOffset() { + return Collect.hashMapOf( + QUERY_FROM_LSN_KEY, queryFromLsn == null ? null : queryFromLsn.toString(), + QUERY_TO_LSN_KEY, queryToLsn == null ? null : queryToLsn.toString(), + QUERY_TABLE, queryTable == null ? null : queryTable.toString(), + SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn() == null ? null : sourceInfo.getChangeLsn().toString() + ); + } + + @Override + public Schema getSourceInfoSchema() { + return sourceInfoSchema; + } + + @Override + public Struct getSourceInfo() { + return sourceInfo.struct(); + } + + public void setChangeLsn(Lsn lsn) { + sourceInfo.setChangeLsn(lsn); + } + + public void setSourceTime(Instant instant) { + sourceInfo.setSourceTime(instant); + } + + public void setQueryFromLsn(Lsn queryFromLsn) { + this.queryFromLsn = queryFromLsn; + } + + public void setQueryToLsn(Lsn queryToLsn) { + this.queryToLsn = queryToLsn; + } + + public void setQueryTable(TableId queryTable) { + this.queryTable = queryTable; + } + + @Override + public boolean isSnapshotRunning() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void preSnapshotStart() { + // TODO Auto-generated method stub + } + + @Override + public void preSnapshotCompletion() { + // TODO Auto-generated method stub + } + + @Override + public void postSnapshotCompletion() { + // TODO Auto-generated method stub + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java new file mode 100644 index 000000000..3be0b91eb --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java @@ -0,0 +1,67 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.pipeline.spi.SchemaChangeEventEmitter; +import io.debezium.relational.TableId; +import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; + +/** + * {@link SchemaChangeEventEmitter} implementation based on SqlServer. + * + * @author Jiri Pechanec + */ +public class SqlServerSchemaChangeEventEmitter implements SchemaChangeEventEmitter { + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSchemaChangeEventEmitter.class); + + private final SqlServerOffsetContext offsetContext; + private final TableId tableId; + + public SqlServerSchemaChangeEventEmitter(SqlServerOffsetContext offsetContext, TableId tableId) { + this.offsetContext = offsetContext; + this.tableId = tableId; + } + + @Override + public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException { +// SchemaChangeEventType eventType = getSchemaChangeEventType(); +// if (eventType == null) { +// return; +// } +// +// Tables tables = new Tables(); +// +// SqlServerDdlParser parser = new SqlServerDdlParser(); +// parser.setCurrentDatabase(ddlLcr.getSourceDatabaseName()); +// parser.setCurrentSchema(ddlLcr.getObjectOwner()); +// parser.parse(ddlLcr.getDDLText(), tables); +// +// Set changedTableIds = tables.drainChanges(); +// if (changedTableIds.isEmpty()) { +// throw new IllegalArgumentException("Couldn't parse DDL statement " + ddlLcr.getDDLText()); +// } +// +// Table table = tables.forTable(tableId); +// +// receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), ddlLcr.getDDLText(), table, eventType, false)); + } + + private SchemaChangeEventType getSchemaChangeEventType() { +// switch(ddlLcr.getCommandType()) { +// case "CREATE TABLE": return SchemaChangeEventType.CREATE; +// case "ALTER TABLE": LOGGER.warn("ALTER TABLE not yet implemented"); +// case "DROP TABLE": LOGGER.warn("DROP TABLE not yet implemented"); +// default: +// LOGGER.debug("Ignoring DDL event of type {}", ddlLcr.getCommandType()); +// return null; +// } + return null; + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java new file mode 100644 index 000000000..8dd621d64 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -0,0 +1,182 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Set; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; +import io.debezium.pipeline.spi.SnapshotResult; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import io.debezium.schema.SchemaChangeEvent; +import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; + +public class SqlServerSnapshotChangeEventSource implements SnapshotChangeEventSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotChangeEventSource.class); + + private final SqlServerConnectorConfig connectorConfig; + private final SqlServerOffsetContext previousOffset; + private final SqlServerConnection jdbcConnection; + private final SqlServerDatabaseSchema schema; + + public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection, SqlServerDatabaseSchema schema) { + this.connectorConfig = connectorConfig; + this.previousOffset = previousOffset; + this.jdbcConnection = jdbcConnection; + this.schema = schema; + } + + @Override + public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException { + // for now, just simple schema snapshotting is supported which just needs to be done once + if (previousOffset != null) { + LOGGER.debug("Found previous offset, skipping snapshotting"); + return SnapshotResult.completed(previousOffset); + } + + Connection connection = null; + SnapshotContext ctx = null; + + try { + connection = jdbcConnection.connection(); + connection.setAutoCommit(false); + + ctx = new SnapshotContext( + context, + connection, + connectorConfig.getDatabaseName() + ); + + ctx.capturedTables = schema.getCapturedTables(); + + if (!lockDatabase(ctx)) { + return SnapshotResult.aborted(); + } + + determineOffsetContextWithLsn(ctx); + readTableStructure(ctx); + + if (!createSchemaChangeEventsForTables(ctx)) { + return SnapshotResult.aborted(); + } + + return SnapshotResult.completed(ctx.offset); + } + catch(RuntimeException e) { + throw e; + } + catch(Exception e) { + throw new RuntimeException(e); + } + finally { + if (ctx != null) { + ctx.dispose(); + } + + rollbackTransaction(connection); + } + } + + private boolean lockDatabase(SnapshotContext ctx) throws SQLException { + // TODO use SET SINGLE + return true; + } + + private void determineOffsetContextWithLsn(SnapshotContext ctx) throws SQLException { + ctx.offset = new SqlServerOffsetContext(connectorConfig.getLogicalName()); + final Lsn lsn = jdbcConnection.getMaxLsn(); + } + + private void readTableStructure(SnapshotContext ctx) throws SQLException { + ctx.tables = new Tables(); + + Set schemas = ctx.capturedTables.stream() + .map(TableId::schema) + .collect(Collectors.toSet()); + + // reading info only for the schemas we're interested in as per the set of captured tables; + // while the passed table name filter alone would skip all non-included tables, reading the schema + // would take much longer that way + for (String schema : schemas) { + jdbcConnection.readSchema( + ctx.tables, + ctx.catalogName, + schema, + connectorConfig.getTableFilters().dataCollectionFilter(), + null, + false + ); + } + } + + private boolean createSchemaChangeEventsForTables(SnapshotContext ctx) throws SQLException { + for (TableId tableId : ctx.capturedTables) { + if (!ctx.changeEventSourceContext.isRunning()) { + return false; + } + + LOGGER.debug("Capturing structure of table {}", tableId); + Table table = ctx.tables.forTable(tableId); + + // TODO - use sp_help and sp_columns to build CREATE TABLE + final String ddl = ""; + + schema.applySchemaChange(new SchemaChangeEvent(ctx.offset.getPartition(), ctx.offset.getOffset(), ctx.catalogName, + tableId.schema(), ddl, table, SchemaChangeEventType.CREATE, true)); + } + + return true; + } + + private void rollbackTransaction(Connection connection) { + if(connection != null) { + try { + connection.rollback(); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Mutable context which is populated in the course of snapshotting. + */ + private static class SnapshotContext { + + public final ChangeEventSourceContext changeEventSourceContext; + public final Statement statement; + public final String catalogName; + + public Set capturedTables; + public SqlServerOffsetContext offset; + public Tables tables; + + public SnapshotContext(ChangeEventSourceContext changeEventSourceContext, Connection connection, String catalogName) throws SQLException { + this.changeEventSourceContext = changeEventSourceContext; + this.statement = connection.createStatement(); + this.catalogName = catalogName; + } + + public void dispose() { + try { + statement.close(); + } + catch (SQLException e) { + LOGGER.error("Couldn't close statement", e); + } + } + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java new file mode 100644 index 000000000..e9d1d9562 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -0,0 +1,202 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; + +/** + * A {@link StreamingChangeEventSource} based on SQL Server change data capture functionality. + * A main polls database DDL change and change data tables and turns them into change events. + * + * @author Jiri Pechanec + */ +public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource { + + private static final int COL_COMMIT_LSN = 1; + private static final int COL_ROW_LSN = 2; + private static final int COL_OPERATION = 3; + private static final int COL_DATA = 5; + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class); + + private final SqlServerConnection connection; + private final EventDispatcher dispatcher; + private final ErrorHandler errorHandler; + private final Clock clock; + private final SqlServerDatabaseSchema schema; + private final SqlServerOffsetContext offsetContext; + private final Duration pollInterval; + + public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext offsetContext, SqlServerConnection connection, EventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema schema) { + this.connection = connection; + this.dispatcher = dispatcher; + this.errorHandler = errorHandler; + this.clock = clock; + this.schema = schema; + this.offsetContext = offsetContext; + this.pollInterval = Duration.ofSeconds(1); + } + + @Override + public void execute(ChangeEventSourceContext context) throws InterruptedException { + try { + final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]); + Lsn lastProcessedLsn = new Lsn(null); + while (context.isRunning()) { + final Lsn currentMaxLsn = connection.getMaxLsn(); + + // Probably cannot happen but it is better to guard against such + // situation + if (!currentMaxLsn.isAvailable()) { + Thread.sleep(pollInterval.toMillis()); + continue; + } + // There is no change in the database + if (currentMaxLsn.equals(lastProcessedLsn)) { + Thread.sleep(pollInterval.toMillis()); + continue; + } + + // Reading interval is inclusive so we need to move LSN forward + final Lsn fromLsn = lastProcessedLsn.isAvailable() ? connection.incrementLsn(lastProcessedLsn) + : lastProcessedLsn; + + connection.getChangesForTables(tables, fromLsn, currentMaxLsn, resultSets -> { + final int tableCount = resultSets.length; + final ChangeTable[] changeTables = new ChangeTable[tableCount]; + + for (int i = 0; i < tableCount; i++) { + changeTables[i] = new ChangeTable(tables[i], resultSets[i]); + changeTables[i].next(); + } + + for (;;) { + ChangeTable tableSmallestLsn = null; + for (int i = 0; i < tableCount; i++) { + final ChangeTable changeTable = changeTables[i]; + if (changeTable.isCompleted()) { + continue; + } + if (tableSmallestLsn == null || changeTable.compareTo(tableSmallestLsn) < 0) { + tableSmallestLsn = changeTable; + } + } + if (tableSmallestLsn == null) { + // No more LSNs available + break; + } + + final TableId tableId = tableSmallestLsn.getTableId(); + final Lsn commitLsn = tableSmallestLsn.getCommitLsn(); + final Lsn rowLsn = tableSmallestLsn.getRowLsn(); + final int operation = tableSmallestLsn.getOperation(); + final Object[] data = tableSmallestLsn.getData(); + + // UPDATE consists of two consecutive events, first event contains + // the row before it was updated and the second the row after + // it was updated + if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { + if (!tableSmallestLsn.next() || tableSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { + throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); + } + } + final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null; + + offsetContext.setChangeLsn(rowLsn); + offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn)); + offsetContext.setQueryFromLsn(fromLsn); + offsetContext.setQueryToLsn(currentMaxLsn); + offsetContext.setQueryTable(tableId); + + try { + dispatcher + .dispatchDataChangeEvent( + tableId, new SqlServerChangeRecordEmitter(offsetContext, operation, + data, dataNext, schema.tableFor(tableId), clock)); + } + catch (InterruptedException e) { + break; + } + tableSmallestLsn.next(); + } + }); + lastProcessedLsn = currentMaxLsn; + } + } + catch (Exception e) { + throw new ConnectException(e); + } + } + + @Override + public void commitOffset(Map offset) { + } + + private static class ChangeTable { + + private final TableId tableId; + private final ResultSet resultSet; + private boolean completed = false; + + public ChangeTable(TableId tableId, ResultSet resultSet) { + super(); + this.tableId = tableId; + this.resultSet = resultSet; + } + + public TableId getTableId() { + return tableId; + } + + public Lsn getCommitLsn() throws SQLException { + return new Lsn(resultSet.getBytes(COL_COMMIT_LSN)); + } + + public Lsn getRowLsn() throws SQLException { + return new Lsn(resultSet.getBytes(COL_ROW_LSN)); + } + + public int getOperation() throws SQLException { + return resultSet.getInt(COL_OPERATION); + } + + public Object[] getData() throws SQLException { + final int dataColumnCount = resultSet.getMetaData().getColumnCount() - (COL_DATA - 1); + final Object[] data = new Object[dataColumnCount]; + for (int i = 0; i < dataColumnCount; i++) { + data[i] = resultSet.getObject(COL_DATA + i); + } + return data; + } + + public boolean next() throws SQLException { + completed = !resultSet.next(); + return !completed; + } + + public boolean isCompleted() { + return completed; + } + + public int compareTo(ChangeTable o) throws SQLException { + return getRowLsn().compareTo(o.getRowLsn()); + } + } + +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java new file mode 100644 index 000000000..3c397054b --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTaskContext.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import io.debezium.connector.common.CdcSourceTaskContext; + +/** + * A state (context) associated with a SQL Server task + * + * @author Jiri Pechanec + * + */ +public class SqlServerTaskContext extends CdcSourceTaskContext { + + public SqlServerTaskContext(SqlServerConnectorConfig config) { + super("Oracle", config.getLogicalName()); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTopicSelector.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTopicSelector.java new file mode 100644 index 000000000..8ce324042 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerTopicSelector.java @@ -0,0 +1,33 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import io.debezium.relational.TableId; +import io.debezium.schema.TopicSelector; + +/** + * The topic naming strategy based on connector configuration and table name + * + * @author Jiri Pechanec + * + */ +public class SqlServerTopicSelector implements TopicSelector { + + private final String prefix; + + public SqlServerTopicSelector(String prefix) { + this.prefix = prefix; + } + + public static SqlServerTopicSelector defaultSelector(String prefix) { + return new SqlServerTopicSelector(prefix); + } + + @Override + public String topicNameFor(TableId tableId) { + return String.join(".", prefix, tableId.schema(), tableId.table()); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerValueConverters.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerValueConverters.java new file mode 100644 index 000000000..1b47774a0 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerValueConverters.java @@ -0,0 +1,94 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.sql.Timestamp; +import java.sql.Types; +import java.time.LocalDateTime; +import java.time.ZoneOffset; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.SchemaBuilder; + +import io.debezium.data.SpecialValueDecimal; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; +import io.debezium.time.ZonedTimestamp; +import microsoft.sql.DateTimeOffset; + +/** + * Conversion of SQL Server specific datatypes. + * + * @author Jiri Pechanec + * + */ +public class SqlServerValueConverters extends JdbcValueConverters { + + public SqlServerValueConverters() { + } + + @Override + public SchemaBuilder schemaBuilder(Column column) { + switch (column.jdbcType()) { + // Numeric integers + case Types.TINYINT: + // values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int + return SchemaBuilder.int16(); + + // Floating point + case microsoft.sql.Types.SMALLMONEY: + case microsoft.sql.Types.MONEY: + return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().get()); + case microsoft.sql.Types.DATETIMEOFFSET: + return ZonedTimestamp.builder(); + default: + return super.schemaBuilder(column); + } + } + + @Override + public ValueConverter converter(Column column, Field fieldDefn) { + switch (column.jdbcType()) { + // Numeric integers + case Types.TINYINT: + // values are an 8-bit unsigned integer value between 0 and 255, we thus need to store it in short int + return (data) -> convertSmallInt(column, fieldDefn, data); + + // Floating point + case microsoft.sql.Types.SMALLMONEY: + case microsoft.sql.Types.MONEY: + return (data) -> convertDecimal(column, fieldDefn, data); + case microsoft.sql.Types.DATETIMEOFFSET: + return (data) -> convertTimestampWithZone(column, fieldDefn, data); + + // TODO Geometry and geography supported since 6.5.0 + default: + return super.converter(column, fieldDefn); + } + } + + /** + * Time precision in SQL Server is defined in scale, the default one is 7 + */ + @Override + protected int getTimePrecision(Column column) { + return column.scale().get(); + } + + protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) { + if (!(data instanceof DateTimeOffset)) { + return super.convertTimestampWithZone(column, fieldDefn, data); + } + final DateTimeOffset dto = (DateTimeOffset)data; + + // Timestamp is provided in UTC time + final Timestamp utc = dto.getTimestamp(); + final ZoneOffset offset = ZoneOffset.ofTotalSeconds(dto.getMinutesOffset() * 60); + return super.convertTimestampWithZone(column, fieldDefn, LocalDateTime.ofEpochSecond(utc.getTime() / 1000, utc.getNanos(), offset).atOffset(offset)); + } + +} \ No newline at end of file diff --git a/debezium-connector-sqlserver/src/main/resources/io/debezium/connector/sqlserver/build.version b/debezium-connector-sqlserver/src/main/resources/io/debezium/connector/sqlserver/build.version new file mode 100644 index 000000000..e5683df88 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/resources/io/debezium/connector/sqlserver/build.version @@ -0,0 +1 @@ +version=${project.version} \ No newline at end of file diff --git a/debezium-connector-sqlserver/src/main/resources/statements.properties b/debezium-connector-sqlserver/src/main/resources/statements.properties index 4253b9d42..8001dc305 100644 --- a/debezium-connector-sqlserver/src/main/resources/statements.properties +++ b/debezium-connector-sqlserver/src/main/resources/statements.properties @@ -1,4 +1,7 @@ enable_cdc_for_db=IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=0)\n\ EXEC sys.sp_cdc_enable_db +disable_cdc_for_db=IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=1)\n\ + EXEC sys.sp_cdc_disable_db enable_cdc_for_table=IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n\ EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0 +get_max_lsn=SELECT sys.fn_cdc_get_max_lsn() diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java new file mode 100644 index 000000000..5ea30e332 --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java @@ -0,0 +1,253 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import static org.fest.assertions.Assertions.assertThat; + +import java.math.BigDecimal; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.data.SchemaAndValueField; +import io.debezium.data.VerifyRecord; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.time.Date; +import io.debezium.time.MicroTime; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; +import io.debezium.time.ZonedTimestamp; +import io.debezium.util.Testing; + +/** + * Integration test to verify different Oracle datatypes. + * + * @author Jiri Pechanec + */ +public abstract class AbstractSqlServerDatatypesTest extends AbstractConnectorTest { + + /** + * Key for schema parameter used to store DECIMAL/NUMERIC columns' precision. + */ + static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision"; + + private static final String DDL_STRING = "create table type_string (" + + " id int not null, " + + " val_char char(3), " + + " val_varchar varchar(1000), " + + " val_text text, " + + " val_nchar nchar(3), " + + " val_nvarchar nvarchar(1000), " + + " val_ntext ntext, " + + " primary key (id)" + + ")"; + + private static final String DDL_FP = "create table type_fp (" + + " id int not null, " + + " val_decimal decimal(6,3), " + + " val_numeric numeric, " + + " val_float float, " + + " val_real real, " + + " val_smallmoney smallmoney, " + + " val_money money " + + " primary key (id)" + + ")"; + + private static final String DDL_INT = "create table type_int (" + + " id int not null, " + + " val_bit bit, " + + " val_tinyint tinyint, " + + " val_smallint smallint, " + + " val_int int, " + + " val_bigint bigint, " + + " primary key (id)" + + ")"; + + private static final String DDL_TIME = "create table type_time (" + + " id int not null, " + + " val_date date, " + + " val_time time(4), " + + " val_datetime2 datetime2, " + + " val_datetimeoffset datetimeoffset, " + + " val_datetime datetime, " + + " val_smalldatetime smalldatetime, " + + " primary key (id)" + + ")"; + + private static final String DDL_XML = "create table type_xml (" + + " id int not null, " + + " val_xml xml, " + + " primary key (id)" + + ")"; + + private static final List EXPECTED_INT = Arrays.asList( + new SchemaAndValueField("val_bit", Schema.OPTIONAL_BOOLEAN_SCHEMA, true), + new SchemaAndValueField("val_tinyint", Schema.OPTIONAL_INT16_SCHEMA, (short)22), + new SchemaAndValueField("val_smallint", Schema.OPTIONAL_INT16_SCHEMA, (short)333), + new SchemaAndValueField("val_int", Schema.OPTIONAL_INT32_SCHEMA, 4444), + new SchemaAndValueField("val_bigint", Schema.OPTIONAL_INT64_SCHEMA, 55555l) + ); + + private static final List EXPECTED_FP = Arrays.asList( + new SchemaAndValueField("val_decimal",Decimal.builder(3).parameter(PRECISION_PARAMETER_KEY, "6").optional().build(), new BigDecimal("1.123")), + new SchemaAndValueField("val_numeric", Decimal.builder(0).parameter(PRECISION_PARAMETER_KEY, "18").optional().build(), new BigDecimal("2")), + new SchemaAndValueField("val_float", Schema.OPTIONAL_FLOAT64_SCHEMA, 3.323), + new SchemaAndValueField("val_real", Schema.OPTIONAL_FLOAT32_SCHEMA, 4.323f), + new SchemaAndValueField("val_smallmoney", Decimal.builder(4).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("5.3230")), + new SchemaAndValueField("val_money", Decimal.builder(4).parameter(PRECISION_PARAMETER_KEY, "19").optional().build(), new BigDecimal("6.3230")) + ); + + private static final List EXPECTED_STRING = Arrays.asList( + new SchemaAndValueField("val_char", Schema.OPTIONAL_STRING_SCHEMA, "cc "), + new SchemaAndValueField("val_varchar", Schema.OPTIONAL_STRING_SCHEMA, "vcc"), + new SchemaAndValueField("val_text", Schema.OPTIONAL_STRING_SCHEMA, "tc"), + new SchemaAndValueField("val_nchar", Schema.OPTIONAL_STRING_SCHEMA, "c\u010d "), + new SchemaAndValueField("val_nvarchar", Schema.OPTIONAL_STRING_SCHEMA, "vc\u010d"), + new SchemaAndValueField("val_ntext", Schema.OPTIONAL_STRING_SCHEMA, "t\u010d") + ); + + private static final List EXPECTED_DATE_TIME = Arrays.asList( + new SchemaAndValueField("val_date", Date.builder().optional().build(), 17_725), + new SchemaAndValueField("val_time", MicroTime.builder().optional().build(), 37_425_000_000l), + new SchemaAndValueField("val_datetime2", NanoTimestamp.builder().optional().build(), 1_531_481_025_340_000_000l), + new SchemaAndValueField("val_datetimeoffset", ZonedTimestamp.builder().optional().build(), "2018-07-13T12:23:45.456+11:00"), + new SchemaAndValueField("val_datetime", Timestamp.builder().optional().build(), 1_531_488_225_780l), + new SchemaAndValueField("val_smalldatetime", Timestamp.builder().optional().build(), 1_531_491_840_000l) + ); + + private static final List EXPECTED_XML = Arrays.asList( + new SchemaAndValueField("val_xml", Schema.OPTIONAL_STRING_SCHEMA, "b") + ); + + private static final String[] ALL_TABLES = { + "type_int", + "type_fp", + "type_string", + "type_time", + "type_xml" + }; + + private static final String[] ALL_DDLS = { + DDL_INT, + DDL_FP, + DDL_STRING, + DDL_TIME, + DDL_XML + }; + + private static final int EXPECTED_RECORD_COUNT = ALL_DDLS.length; + + @AfterClass + public static void dropTables() throws SQLException { + TestHelper.dropTestDatabase(); + } + + @BeforeClass + public static void createTables() throws SQLException { + TestHelper.createTestDatabase(); + try (SqlServerConnection connection = TestHelper.testConnection()) { + connection.execute(ALL_DDLS); + for (String table: ALL_TABLES) { + connection.enableTableCdc(table); + } + connection.execute( + "INSERT INTO type_int VALUES (0, 1, 22, 333, 4444, 55555)", + "INSERT INTO type_fp VALUES (0, 1.123, 2, 3.323, 4.323, 5.323, 6.323)", + "INSERT INTO type_string VALUES (0, 'c\u010d', 'vc\u010d', 't\u010d', N'c\u010d', N'vc\u010d', N't\u010d')", + "INSERT INTO type_time VALUES (0, '2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 12:23:45.456+11:00', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45')", + "INSERT INTO type_xml VALUES (0, 'b')" + ); + } + } + + @Test + public void intTypes() throws Exception { + Testing.debug("Inserted"); + + final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT); + + List testTableRecords = records.recordsForTopic("server1.dbo.type_int"); + assertThat(testTableRecords).hasSize(1); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_INT); + } + + @Test + public void fpTypes() throws Exception { + Testing.debug("Inserted"); + + final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT); + + List testTableRecords = records.recordsForTopic("server1.dbo.type_fp"); + assertThat(testTableRecords).hasSize(1); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_FP); + } + + @Test + public void stringTypes() throws Exception { + Testing.debug("Inserted"); + + final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT); + + List testTableRecords = records.recordsForTopic("server1.dbo.type_string"); + assertThat(testTableRecords).hasSize(1); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_STRING); + } + + @Test + public void dateTimeTypes() throws Exception { + Testing.debug("Inserted"); + + final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT); + + List testTableRecords = records.recordsForTopic("server1.dbo.type_time"); + assertThat(testTableRecords).hasSize(1); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_DATE_TIME); + } + + @Test + public void otherTypes() throws Exception { + Testing.debug("Inserted"); + + final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT); + + List testTableRecords = records.recordsForTopic("server1.dbo.type_xml"); + assertThat(testTableRecords).hasSize(1); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_XML); + } + + private void assertRecord(Struct record, List expected) { + expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); + } +} diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotDatatypesIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotDatatypesIT.java new file mode 100644 index 000000000..b770877ee --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotDatatypesIT.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.sql.SQLException; + +import org.junit.Before; +import org.junit.BeforeClass; + +import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.util.Testing; + +/** + * Integration test to verify different Oracle datatypes. + * + * @author Jiri Pechanec + */ +public class SnapshotDatatypesIT extends AbstractSqlServerDatatypesTest { + + @BeforeClass + public static void beforeClass() throws SQLException { + createTables(); + } + + @Before + public void before() throws Exception { + initializeConnectorTestFramework(); + Testing.Debug.enable(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + + Configuration config = TestHelper.defaultConfig().build(); + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + Thread.sleep(1000); + } +} diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java index 1f7d439c5..df5971f48 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java @@ -6,11 +6,13 @@ package io.debezium.connector.sqlserver; +import java.math.BigInteger; import java.sql.SQLException; import org.junit.BeforeClass; import org.junit.Test; +import io.debezium.connector.sqlserver.util.TestHelper; import io.debezium.util.Testing; /** @@ -22,19 +24,11 @@ public class SqlServerConnectionIT { @BeforeClass public static void beforeClass() throws SQLException { - // NOTE: you cannot enable CDC for the "master" db (the default one) so - // all tests must use a separate database... - try (SqlServerConnection connection = new SqlServerConnection(TestHelper.defaultJdbcConfig())) { - connection.connect(); - String sql = "IF EXISTS(select 1 from sys.databases where name='testDB') DROP DATABASE testDB\n" - + "CREATE DATABASE testDB\n"; - connection.execute(sql); - } } @Test - public void shouldEnableCDCForDatabase() throws Exception { - try (SqlServerConnection connection = new SqlServerConnection(TestHelper.defaultJdbcConfig())) { + public void shouldEnableCdcForDatabase() throws Exception { + try (SqlServerConnection connection = TestHelper.adminConnection()) { connection.connect(); connection.execute("USE testDB"); // NOTE: you cannot enable CDC on master @@ -43,8 +37,8 @@ public void shouldEnableCDCForDatabase() throws Exception { } @Test - public void shouldEnableCDCWithWrapperFunctionsForTable() throws Exception { - try (SqlServerConnection connection = new SqlServerConnection(TestHelper.defaultJdbcConfig())) { + public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception { + try (SqlServerConnection connection = TestHelper.adminConnection()) { connection.connect(); connection.execute("USE testDB"); // NOTE: you cannot enable CDC on master @@ -71,7 +65,8 @@ public void shouldEnableCDCWithWrapperFunctionsForTable() throws Exception { "select * from cdc.fn_cdc_get_all_changes_dbo_testTable(sys.fn_cdc_get_min_lsn('dbo_testTable'), sys.fn_cdc_get_max_lsn(), N'all')", rs -> { while (rs.next()) { - final StringBuilder sb = new StringBuilder(); + final BigInteger lsn = new BigInteger(rs.getBytes(1)); + final StringBuilder sb = new StringBuilder(lsn.toString()); for (int col = 1; col <= rs.getMetaData().getColumnCount(); col++) { sb.append(rs.getObject(col)).append(' '); } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java new file mode 100644 index 000000000..c51350b43 --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -0,0 +1,201 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import static org.junit.Assert.assertNull; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.fest.assertions.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.util.TestHelper; +import io.debezium.data.SchemaAndValueField; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +/** + * Integration test for the Debezium SQL Server connector. + * + * @author Jiri Pechanec + */ +public class SqlServerConnectorIT extends AbstractConnectorTest { + + private static SqlServerConnection connection; + + @BeforeClass + public static void beforeClass() { + } + + @AfterClass + public static void closeConnection() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + @Before + public void before() throws SQLException { + TestHelper.createTestDatabase(); + connection = TestHelper.testConnection(); + connection.execute( + "CREATE TABLE tablea (id int primary key, cola varchar(30))", + "CREATE TABLE tableb (id int primary key, colb varchar(30))", + "INSERT INTO tablea VALUES(1, 'a')" + ); + connection.enableTableCdc("tablea"); + connection.enableTableCdc("tableb"); + + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + } + + @After + public void after() throws SQLException { + connection.close(); + TestHelper.dropTestDatabase(); + } + + @Test + public void createAndDelete() throws Exception { + final int RECORDS_PER_TABLE = 5; + final int TABLES = 2; + final int ID_START = 10; + final Configuration config = TestHelper.defaultConfig().build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START + i; + connection.execute( + "INSERT INTO tablea VALUES(" + id + ", 'a')" + ); + connection.execute( + "INSERT INTO tableb VALUES(" + id + ", 'b')" + ); + } + + final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); + final List tableA = records.recordsForTopic("server1.dbo.tablea"); + final List tableB = records.recordsForTopic("server1.dbo.tableb"); + Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE); + Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE); + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final SourceRecord recordA = tableA.get(i); + final SourceRecord recordB = tableB.get(i); + final List expectedRowA = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a")); + final List expectedRowB = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + + final Struct keyA = (Struct)recordA.key(); + final Struct valueA = (Struct)recordA.value(); + assertRecord((Struct)valueA.get("after"), expectedRowA); + assertNull(valueA.get("before")); + + final Struct keyB = (Struct)recordB.key(); + final Struct valueB = (Struct)recordB.value(); + assertRecord((Struct)valueB.get("after"), expectedRowB); + assertNull(valueB.get("before")); + } + + connection.execute("DELETE FROM tableB"); + final SourceRecords deleteRecords = consumeRecordsByTopic(2 * RECORDS_PER_TABLE); + final List deleteTableA = deleteRecords.recordsForTopic("server1.dbo.tablea"); + final List deleteTableB = deleteRecords.recordsForTopic("server1.dbo.tableb"); + Assertions.assertThat(deleteTableA).isNullOrEmpty(); + Assertions.assertThat(deleteTableB).hasSize(2 * RECORDS_PER_TABLE); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final SourceRecord deleteRecord = deleteTableB.get(i * 2); + final SourceRecord tombstoneRecord = deleteTableB.get(i * 2 + 1); + final List expectedDeleteRow = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + + final Struct deleteKey = (Struct)deleteRecord.key(); + final Struct deleteValue = (Struct)deleteRecord.value(); + assertRecord((Struct)deleteValue.get("before"), expectedDeleteRow); + assertNull(deleteValue.get("after")); + + final Struct tombstoneKey = (Struct)tombstoneRecord.key(); + final Struct tombstoneValue = (Struct)tombstoneRecord.value(); + assertNull(tombstoneValue); + } + + stopConnector(); + } + + @Test + public void update() throws Exception { + final int RECORDS_PER_TABLE = 5; + final int ID_START = 10; + final Configuration config = TestHelper.defaultConfig().build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + connection.setAutoCommit(false); + final String[] tableBInserts = new String[RECORDS_PER_TABLE]; + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START + i; + tableBInserts[i] = "INSERT INTO tableb VALUES(" + id + ", 'b')"; + } + connection.execute(tableBInserts); + connection.setAutoCommit(true); + + connection.execute("UPDATE tableb SET colb='z'"); + + final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * 2); + final List tableB = records.recordsForTopic("server1.dbo.tableb"); + Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE * 2); + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final SourceRecord recordB = tableB.get(i); + final List expectedRowB = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + + final Struct keyB = (Struct)recordB.key(); + final Struct valueB = (Struct)recordB.value(); + assertRecord((Struct)valueB.get("after"), expectedRowB); + assertNull(valueB.get("before")); + } + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final SourceRecord recordB = tableB.get(i + RECORDS_PER_TABLE); + final List expectedBefore = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); + final List expectedAfter = Arrays.asList( + new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START), + new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "z")); + + final Struct keyB = (Struct)recordB.key(); + final Struct valueB = (Struct)recordB.value(); + assertRecord((Struct)valueB.get("before"), expectedBefore); + assertRecord((Struct)valueB.get("after"), expectedAfter); + } + + stopConnector(); + } + + private void assertRecord(Struct record, List expected) { + expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); + } + } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TestHelper.java deleted file mode 100644 index 0a9a932e5..000000000 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/TestHelper.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ - -package io.debezium.connector.sqlserver; - -import io.debezium.config.Configuration; -import io.debezium.jdbc.JdbcConfiguration; - -/** - * @author Horia Chiorean (hchiorea@redhat.com) - */ -public class TestHelper { - - protected static JdbcConfiguration defaultJdbcConfig() { - return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) - .withDefault(JdbcConfiguration.DATABASE, "master") - .withDefault(JdbcConfiguration.HOSTNAME, "localhost") - .withDefault(JdbcConfiguration.PORT, 1433) - .withDefault(JdbcConfiguration.USER, "sa") - .withDefault(JdbcConfiguration.PASSWORD, "Password!") - .build(); - } -} diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java new file mode 100644 index 000000000..6f8f502ec --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -0,0 +1,115 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.sqlserver.util; + +import java.nio.file.Path; +import java.sql.SQLException; + +import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.SqlServerConnection; +import io.debezium.connector.sqlserver.SqlServerConnectionFactory; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.relational.history.FileDatabaseHistory; +import io.debezium.util.Testing; + +/** + * @author Horia Chiorean (hchiorea@redhat.com) + */ +public class TestHelper { + + public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath(); + public static final String TEST_DATABASE = "testDB"; + + public static JdbcConfiguration adminJdbcConfig() { + return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) + .withDefault(JdbcConfiguration.DATABASE, "master") + .withDefault(JdbcConfiguration.HOSTNAME, "localhost") + .withDefault(JdbcConfiguration.PORT, 1433) + .withDefault(JdbcConfiguration.USER, "sa") + .withDefault(JdbcConfiguration.PASSWORD, "Password!") + .build(); + } + + public static JdbcConfiguration defaultJdbcConfig() { + return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) + .withDefault(JdbcConfiguration.DATABASE, TEST_DATABASE) + .withDefault(JdbcConfiguration.HOSTNAME, "localhost") + .withDefault(JdbcConfiguration.PORT, 1433) + .withDefault(JdbcConfiguration.USER, "sa") + .withDefault(JdbcConfiguration.PASSWORD, "Password!") + .build(); + } + + /** + * Returns a default configuration suitable for most test cases. Can be amended/overridden in individual tests as + * needed. + */ + public static Configuration.Builder defaultConfig() { + JdbcConfiguration jdbcConfiguration = defaultJdbcConfig(); + Configuration.Builder builder = Configuration.create(); + + jdbcConfiguration.forEach( + (field, value) -> builder.with(SqlServerConnectorConfig.DATABASE_CONFIG_PREFIX + field, value) + ); + + return builder.with(SqlServerConnectorConfig.LOGICAL_NAME, "server1") + .with(SqlServerConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class) + .with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH); + } + + public static void createTestDatabase() { + // NOTE: you cannot enable CDC for the "master" db (the default one) so + // all tests must use a separate database... + try (SqlServerConnection connection = adminConnection()) { + connection.connect(); + try { + connection.execute("USE testDB"); + connection.disableDbCdc("testDB"); + } + catch (SQLException e) { + } + connection.execute("USE master"); + String sql = "IF EXISTS(select 1 from sys.databases where name='testDB') DROP DATABASE testDB\n" + + "CREATE DATABASE testDB\n"; + connection.execute(sql); + connection.execute("USE testDB"); + // NOTE: you cannot enable CDC on master + connection.enableDbCdc("testDB"); + } + catch (SQLException e) { + throw new IllegalStateException("Error while initating test database", e); + } + } + + public static void dropTestDatabase() { + try (SqlServerConnection connection = adminConnection()) { + connection.connect(); + try { + connection.execute("USE testDB"); + connection.disableDbCdc("testDB"); + } + catch (SQLException e) { + } + connection.execute("USE master"); + String sql = "IF EXISTS(select 1 from sys.databases where name='testDB') DROP DATABASE testDB"; + connection.execute(sql); + } + catch (SQLException e) { + throw new IllegalStateException("Error while initating test database", e); + } + } + + public static SqlServerConnection adminConnection() { + return new SqlServerConnection(TestHelper.adminJdbcConfig(), new SqlServerConnectionFactory()); + } + + public static SqlServerConnection testConnection() { + return new SqlServerConnection(TestHelper.defaultJdbcConfig(), new SqlServerConnectionFactory()); + } + +} diff --git a/pom.xml b/pom.xml index 037591837..277bffc72 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.debezium debezium-parent - 0.8.0.Final + 0.9.0-SNAPSHOT 4.0.0