diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java index d85954a4e..8947d9619 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Lsn.java @@ -10,7 +10,8 @@ import io.debezium.util.Strings; /** - * A logical representation of SQL Server LSN (log sequence number) position. + * A logical representation of SQL Server LSN (log sequence number) position. When LSN is not available + * it is replaced with {@link Lsn.NULL} constant. * * @author Jiri Pechanec * @@ -29,10 +30,16 @@ private Lsn(byte[] binary) { this.binary = binary; } + /** + * @return binary representation of the stored LSN + */ public byte[] getBinary() { return binary; } + /** + * @return true if this is a real LSN or false it it is {@code NULL} + */ public boolean isAvailable() { return binary != null; } @@ -49,6 +56,9 @@ private int[] getUnsignedBinary() { return unsignedBinary; } + /** + * @return textual representation of the stored LSN + */ public String toString() { if (string != null) { return string; @@ -72,10 +82,18 @@ public String toString() { return string; } + /** + * @param lsnString - textual representation of Lsn + * @return LSN converted from its textual representation + */ public static Lsn valueOf(String lsnString) { return (lsnString == null || NULL_STRING.equals(lsnString)) ? NULL : new Lsn(Strings.hexStringToByteArray(lsnString.replace(":", ""))); } + /** + * @param lsnBinary - binary representation of Lsn + * @return LSN converted from its binary representation + */ public static Lsn valueOf(byte[] lsnBinary) { return (lsnBinary == null ) ? NULL : new Lsn(lsnBinary); } @@ -102,6 +120,9 @@ public boolean equals(Object obj) { return true; } + /** + * Enables ordering of LSNs. The {@code NULL} LSN is always the smallest one. + */ @Override public int compareTo(Lsn o) { if (this == o) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SnapshotChangeRecordEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SnapshotChangeRecordEmitter.java index 14364733b..1ae3005b3 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SnapshotChangeRecordEmitter.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SnapshotChangeRecordEmitter.java @@ -13,7 +13,7 @@ /** * Emits change data based on a single row read via JDBC. * - * @author Gunnar Morling + * @author Jiri Pechanec */ public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java index 8fbaaf0b7..59919a09d 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java @@ -13,8 +13,8 @@ import io.debezium.connector.AbstractSourceInfo; /** - * Coordinates from the database log to restart streaming from. Maps to {@code source} field in envelope and - * to connector offsets. + * Coordinates from the database log to establis relation between the change streamed and the source log position. + * Maps to {@code source} field in {@code Envelope}. * * @author Jiri Pechanec * @@ -47,6 +47,9 @@ protected SourceInfo(String serverName) { this.serverName = serverName; } + /** + * @param lsn - LSN of the change in the database log + */ public void setChangeLsn(Lsn lsn) { changeLsn = lsn; } @@ -59,10 +62,16 @@ public Lsn getCommitLsn() { return commitLsn; } + /** + * @param commitLsn - LSN of the {@code COMMIT} of the transaction whose part the change is + */ public void setCommitLsn(Lsn commitLsn) { this.commitLsn = commitLsn; } + /** + * @param instant a time at which the transaction commit was executed + */ public void setSourceTime(Instant instant) { sourceTime = instant; } @@ -71,6 +80,9 @@ public boolean isSnapshot() { return snapshot; } + /** + * @param snapshot - true if the source of even is snapshot phase, nto the database log + */ public void setSnapshot(boolean snapshot) { this.snapshot = snapshot; } @@ -80,6 +92,9 @@ protected Schema schema() { return SCHEMA; } + /** + * @return the coordinates encoded as a {@code Struct} + */ @Override public Struct struct() { final Struct ret = super.struct() diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeRecordEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeRecordEmitter.java index 644926e1f..ea6b4a287 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeRecordEmitter.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeRecordEmitter.java @@ -12,7 +12,7 @@ import io.debezium.util.Clock; /** - * Emits change data based on a a single CDC data row. + * Emits change data based on a single (or two in case of updates) CDC data row(s). * * @author Jiri Pechanec */ diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index e06a9c164..0ead7e852 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -124,6 +124,15 @@ public Lsn getMaxLsn() throws SQLException { }); } + /** + * Provides all changes recorded by the SQL Server CDC capture process for a given table. + * + * @param tableId - the requested table changes + * @param fromLsn - closed lower bound of interval of changes to be provided + * @param toLsn - closed upper bound of interval of changes to be provided + * @param consumer - the change processor + * @throws SQLException + */ public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException { final String cdcNameForTable = cdcNameForTable(tableId); final String query = "SELECT * FROM cdc.fn_cdc_get_all_changes_" + cdcNameForTable + "(ISNULL(?,sys.fn_cdc_get_min_lsn('" + cdcNameForTable + "')), ?, N'all update old')"; @@ -133,6 +142,15 @@ public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSe }, consumer); } + /** + * Provides all changes recorder by the SQL Server CDC capture process for a set of tables. + * + * @param tableIds - the requested tables to obtain changes for + * @param fromLsn - closed lower bound of interval of changes to be provided + * @param toLsn - closed upper bound of interval of changes to be provided + * @param consumer - the change processor + * @throws SQLException + */ public void getChangesForTables(TableId[] tableIds, Lsn fromLsn, Lsn toLsn, MultiResultSetConsumer consumer) throws SQLException { final String[] queries = new String[tableIds.length]; @@ -149,6 +167,13 @@ public void getChangesForTables(TableId[] tableIds, Lsn fromLsn, Lsn toLsn, Mult }, consumer); } + /** + * Obtain the next available position in the database log. + * + * @param lsn - LSN of the current position + * @return LSN of the next position in the database + * @throws SQLException + */ public Lsn incrementLsn(Lsn lsn) throws SQLException { final String LSN_INCREMENT_ERROR = "Increment LSN query must return exactly one value"; final String query = "SELECT sys.fn_cdc_increment_lsn(?)"; @@ -166,6 +191,13 @@ public Lsn incrementLsn(Lsn lsn) throws SQLException { }); } + /** + * Map a commit LSN to a point in time when the commit happened. + * + * @param lsn - LSN of the commit + * @return time when the commit was recorded into the database log + * @throws SQLException + */ public Instant timestampOfLsn(Lsn lsn) throws SQLException { final String LSN_TIMESTAMP_ERROR = "LSN to timestamp query must return exactly one value"; final String query = "SELECT sys.fn_cdc_map_lsn_to_time(?)"; @@ -189,6 +221,12 @@ public Instant timestampOfLsn(Lsn lsn) throws SQLException { }); } + /** + * Creates an exclusive lock for a given table. + * + * @param tableId to be locked + * @throws SQLException + */ public void lockTable(TableId tableId) throws SQLException { final String lockTableStmt = LOCK_TABLE.replace(STATEMENTS_PLACEHOLDER, tableId.table()); execute(lockTableStmt); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java index 161aa456e..fa1b91c30 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java @@ -51,7 +51,7 @@ public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaN public void applySchemaChange(SchemaChangeEvent schemaChange) { LOGGER.debug("Applying schema change event {}", schemaChange); - // just a single table per DDL event for Oracle + // just a single table per DDL event for SQL Server Table table = schemaChange.getTables().iterator().next(); buildAndRegisterSchema(table); tables().overwriteTable(table); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java index 0eb0f2a6a..c1fbf3905 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSchemaChangeEventEmitter.java @@ -12,7 +12,7 @@ import io.debezium.relational.TableId; /** - * {@link SchemaChangeEventEmitter} implementation based on SqlServer. + * {@link SchemaChangeEventEmitter} implementation based on SQL Server. * * @author Jiri Pechanec */ diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index 89a7671af..1b39ce22e 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -47,10 +47,18 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) { // found a previous offset and the earlier snapshot has completed if (previousOffset != null && !previousOffset.isSnapshotRunning()) { + LOGGER.info("A previous offset indicating completed snapshot has been found. Neither schema nor data will be snapshotted."); snapshotSchema = false; snapshotData = false; } else { + LOGGER.info("No previous offset has been found"); + if (connectorConfig.getSnapshotMode().includeData()) { + LOGGER.info("Accroding to the connector configuration both schema and data will be snapshotted"); + } + else { + LOGGER.info("Accroding to the connector configuration only schema will be snapshotted"); + } snapshotData = connectorConfig.getSnapshotMode().includeData(); } @@ -70,8 +78,10 @@ protected Set getAllTableIds(SnapshotContext ctx) throws Exception { @Override protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.NONE) { + LOGGER.info("Schema locking was disabled in connector configuration"); return; } + LOGGER.info("Executing schema locking"); ((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot"); @@ -112,6 +122,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Snapsh throw new InterruptedException("Interrupted while reading structure of schema " + schema); } + LOGGER.info("Reading sturcture of schema '{}'", snapshotContext.catalogName); jdbcConnection.readSchema( snapshotContext.tables, snapshotContext.catalogName, diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 3322f30e8..2f235052d 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -66,11 +66,13 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio // Probably cannot happen but it is better to guard against such // situation if (!currentMaxLsn.isAvailable()) { + LOGGER.debug("No maximum LSN recorded in the database"); metronome.pause(); continue; } // There is no change in the database if (currentMaxLsn.equals(lastProcessedLsn)) { + LOGGER.debug("No change in the database"); metronome.pause(); continue; } @@ -104,6 +106,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio break; } + LOGGER.trace("Processing change {}", tableSmallestLsn); final TableId tableId = tableSmallestLsn.getTableId(); final Lsn commitLsn = tableSmallestLsn.getCommitLsn(); final Lsn rowLsn = tableSmallestLsn.getRowLsn(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java index f7ce7cef3..01f8c471b 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java @@ -32,7 +32,7 @@ import io.debezium.util.Testing; /** - * Integration test to verify different Oracle datatypes. + * Integration test to verify different SQL Server datatypes. * * @author Jiri Pechanec */ diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/DatatypesFromSnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/DatatypesFromSnapshotIT.java index d24bcf097..1a75d06ee 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/DatatypesFromSnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/DatatypesFromSnapshotIT.java @@ -16,7 +16,7 @@ import io.debezium.util.Testing; /** - * Integration test to verify different Oracle datatypes. + * Integration test to verify different SQL Server datatypes. * The types are discovered during snapshotting phase. * * @author Jiri Pechanec