DBZ-40 Added Javadoc and logging

This commit is contained in:
Jiri Pechanec 2018-07-20 06:40:48 +02:00
parent c95c64fcc0
commit 26f1e0e046
11 changed files with 97 additions and 9 deletions

View File

@ -10,7 +10,8 @@
import io.debezium.util.Strings; import io.debezium.util.Strings;
/** /**
* A logical representation of SQL Server LSN (log sequence number) position. * A logical representation of SQL Server LSN (log sequence number) position. When LSN is not available
* it is replaced with {@link Lsn.NULL} constant.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
* *
@ -29,10 +30,16 @@ private Lsn(byte[] binary) {
this.binary = binary; this.binary = binary;
} }
/**
* @return binary representation of the stored LSN
*/
public byte[] getBinary() { public byte[] getBinary() {
return binary; return binary;
} }
/**
* @return true if this is a real LSN or false it it is {@code NULL}
*/
public boolean isAvailable() { public boolean isAvailable() {
return binary != null; return binary != null;
} }
@ -49,6 +56,9 @@ private int[] getUnsignedBinary() {
return unsignedBinary; return unsignedBinary;
} }
/**
* @return textual representation of the stored LSN
*/
public String toString() { public String toString() {
if (string != null) { if (string != null) {
return string; return string;
@ -72,10 +82,18 @@ public String toString() {
return string; return string;
} }
/**
* @param lsnString - textual representation of Lsn
* @return LSN converted from its textual representation
*/
public static Lsn valueOf(String lsnString) { public static Lsn valueOf(String lsnString) {
return (lsnString == null || NULL_STRING.equals(lsnString)) ? NULL : new Lsn(Strings.hexStringToByteArray(lsnString.replace(":", ""))); return (lsnString == null || NULL_STRING.equals(lsnString)) ? NULL : new Lsn(Strings.hexStringToByteArray(lsnString.replace(":", "")));
} }
/**
* @param lsnBinary - binary representation of Lsn
* @return LSN converted from its binary representation
*/
public static Lsn valueOf(byte[] lsnBinary) { public static Lsn valueOf(byte[] lsnBinary) {
return (lsnBinary == null ) ? NULL : new Lsn(lsnBinary); return (lsnBinary == null ) ? NULL : new Lsn(lsnBinary);
} }
@ -102,6 +120,9 @@ public boolean equals(Object obj) {
return true; return true;
} }
/**
* Enables ordering of LSNs. The {@code NULL} LSN is always the smallest one.
*/
@Override @Override
public int compareTo(Lsn o) { public int compareTo(Lsn o) {
if (this == o) { if (this == o) {

View File

@ -13,7 +13,7 @@
/** /**
* Emits change data based on a single row read via JDBC. * Emits change data based on a single row read via JDBC.
* *
* @author Gunnar Morling * @author Jiri Pechanec
*/ */
public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter { public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter {

View File

@ -13,8 +13,8 @@
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
/** /**
* Coordinates from the database log to restart streaming from. Maps to {@code source} field in envelope and * Coordinates from the database log to establis relation between the change streamed and the source log position.
* to connector offsets. * Maps to {@code source} field in {@code Envelope}.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
* *
@ -47,6 +47,9 @@ protected SourceInfo(String serverName) {
this.serverName = serverName; this.serverName = serverName;
} }
/**
* @param lsn - LSN of the change in the database log
*/
public void setChangeLsn(Lsn lsn) { public void setChangeLsn(Lsn lsn) {
changeLsn = lsn; changeLsn = lsn;
} }
@ -59,10 +62,16 @@ public Lsn getCommitLsn() {
return commitLsn; return commitLsn;
} }
/**
* @param commitLsn - LSN of the {@code COMMIT} of the transaction whose part the change is
*/
public void setCommitLsn(Lsn commitLsn) { public void setCommitLsn(Lsn commitLsn) {
this.commitLsn = commitLsn; this.commitLsn = commitLsn;
} }
/**
* @param instant a time at which the transaction commit was executed
*/
public void setSourceTime(Instant instant) { public void setSourceTime(Instant instant) {
sourceTime = instant; sourceTime = instant;
} }
@ -71,6 +80,9 @@ public boolean isSnapshot() {
return snapshot; return snapshot;
} }
/**
* @param snapshot - true if the source of even is snapshot phase, nto the database log
*/
public void setSnapshot(boolean snapshot) { public void setSnapshot(boolean snapshot) {
this.snapshot = snapshot; this.snapshot = snapshot;
} }
@ -80,6 +92,9 @@ protected Schema schema() {
return SCHEMA; return SCHEMA;
} }
/**
* @return the coordinates encoded as a {@code Struct}
*/
@Override @Override
public Struct struct() { public Struct struct() {
final Struct ret = super.struct() final Struct ret = super.struct()

View File

@ -12,7 +12,7 @@
import io.debezium.util.Clock; import io.debezium.util.Clock;
/** /**
* Emits change data based on a a single CDC data row. * Emits change data based on a single (or two in case of updates) CDC data row(s).
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */

View File

@ -124,6 +124,15 @@ public Lsn getMaxLsn() throws SQLException {
}); });
} }
/**
* Provides all changes recorded by the SQL Server CDC capture process for a given table.
*
* @param tableId - the requested table changes
* @param fromLsn - closed lower bound of interval of changes to be provided
* @param toLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @throws SQLException
*/
public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException { public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException {
final String cdcNameForTable = cdcNameForTable(tableId); final String cdcNameForTable = cdcNameForTable(tableId);
final String query = "SELECT * FROM cdc.fn_cdc_get_all_changes_" + cdcNameForTable + "(ISNULL(?,sys.fn_cdc_get_min_lsn('" + cdcNameForTable + "')), ?, N'all update old')"; final String query = "SELECT * FROM cdc.fn_cdc_get_all_changes_" + cdcNameForTable + "(ISNULL(?,sys.fn_cdc_get_min_lsn('" + cdcNameForTable + "')), ?, N'all update old')";
@ -133,6 +142,15 @@ public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSe
}, consumer); }, consumer);
} }
/**
* Provides all changes recorder by the SQL Server CDC capture process for a set of tables.
*
* @param tableIds - the requested tables to obtain changes for
* @param fromLsn - closed lower bound of interval of changes to be provided
* @param toLsn - closed upper bound of interval of changes to be provided
* @param consumer - the change processor
* @throws SQLException
*/
public void getChangesForTables(TableId[] tableIds, Lsn fromLsn, Lsn toLsn, MultiResultSetConsumer consumer) throws SQLException { public void getChangesForTables(TableId[] tableIds, Lsn fromLsn, Lsn toLsn, MultiResultSetConsumer consumer) throws SQLException {
final String[] queries = new String[tableIds.length]; final String[] queries = new String[tableIds.length];
@ -149,6 +167,13 @@ public void getChangesForTables(TableId[] tableIds, Lsn fromLsn, Lsn toLsn, Mult
}, consumer); }, consumer);
} }
/**
* Obtain the next available position in the database log.
*
* @param lsn - LSN of the current position
* @return LSN of the next position in the database
* @throws SQLException
*/
public Lsn incrementLsn(Lsn lsn) throws SQLException { public Lsn incrementLsn(Lsn lsn) throws SQLException {
final String LSN_INCREMENT_ERROR = "Increment LSN query must return exactly one value"; final String LSN_INCREMENT_ERROR = "Increment LSN query must return exactly one value";
final String query = "SELECT sys.fn_cdc_increment_lsn(?)"; final String query = "SELECT sys.fn_cdc_increment_lsn(?)";
@ -166,6 +191,13 @@ public Lsn incrementLsn(Lsn lsn) throws SQLException {
}); });
} }
/**
* Map a commit LSN to a point in time when the commit happened.
*
* @param lsn - LSN of the commit
* @return time when the commit was recorded into the database log
* @throws SQLException
*/
public Instant timestampOfLsn(Lsn lsn) throws SQLException { public Instant timestampOfLsn(Lsn lsn) throws SQLException {
final String LSN_TIMESTAMP_ERROR = "LSN to timestamp query must return exactly one value"; final String LSN_TIMESTAMP_ERROR = "LSN to timestamp query must return exactly one value";
final String query = "SELECT sys.fn_cdc_map_lsn_to_time(?)"; final String query = "SELECT sys.fn_cdc_map_lsn_to_time(?)";
@ -189,6 +221,12 @@ public Instant timestampOfLsn(Lsn lsn) throws SQLException {
}); });
} }
/**
* Creates an exclusive lock for a given table.
*
* @param tableId to be locked
* @throws SQLException
*/
public void lockTable(TableId tableId) throws SQLException { public void lockTable(TableId tableId) throws SQLException {
final String lockTableStmt = LOCK_TABLE.replace(STATEMENTS_PLACEHOLDER, tableId.table()); final String lockTableStmt = LOCK_TABLE.replace(STATEMENTS_PLACEHOLDER, tableId.table());
execute(lockTableStmt); execute(lockTableStmt);

View File

@ -51,7 +51,7 @@ public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaN
public void applySchemaChange(SchemaChangeEvent schemaChange) { public void applySchemaChange(SchemaChangeEvent schemaChange) {
LOGGER.debug("Applying schema change event {}", schemaChange); LOGGER.debug("Applying schema change event {}", schemaChange);
// just a single table per DDL event for Oracle // just a single table per DDL event for SQL Server
Table table = schemaChange.getTables().iterator().next(); Table table = schemaChange.getTables().iterator().next();
buildAndRegisterSchema(table); buildAndRegisterSchema(table);
tables().overwriteTable(table); tables().overwriteTable(table);

View File

@ -12,7 +12,7 @@
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
/** /**
* {@link SchemaChangeEventEmitter} implementation based on SqlServer. * {@link SchemaChangeEventEmitter} implementation based on SQL Server.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */

View File

@ -47,10 +47,18 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
// found a previous offset and the earlier snapshot has completed // found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) { if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
LOGGER.info("A previous offset indicating completed snapshot has been found. Neither schema nor data will be snapshotted.");
snapshotSchema = false; snapshotSchema = false;
snapshotData = false; snapshotData = false;
} }
else { else {
LOGGER.info("No previous offset has been found");
if (connectorConfig.getSnapshotMode().includeData()) {
LOGGER.info("Accroding to the connector configuration both schema and data will be snapshotted");
}
else {
LOGGER.info("Accroding to the connector configuration only schema will be snapshotted");
}
snapshotData = connectorConfig.getSnapshotMode().includeData(); snapshotData = connectorConfig.getSnapshotMode().includeData();
} }
@ -70,8 +78,10 @@ protected Set<TableId> getAllTableIds(SnapshotContext ctx) throws Exception {
@Override @Override
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.NONE) { if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.NONE) {
LOGGER.info("Schema locking was disabled in connector configuration");
return; return;
} }
LOGGER.info("Executing schema locking");
((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot"); ((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
@ -112,6 +122,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Snapsh
throw new InterruptedException("Interrupted while reading structure of schema " + schema); throw new InterruptedException("Interrupted while reading structure of schema " + schema);
} }
LOGGER.info("Reading sturcture of schema '{}'", snapshotContext.catalogName);
jdbcConnection.readSchema( jdbcConnection.readSchema(
snapshotContext.tables, snapshotContext.tables,
snapshotContext.catalogName, snapshotContext.catalogName,

View File

@ -66,11 +66,13 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
// Probably cannot happen but it is better to guard against such // Probably cannot happen but it is better to guard against such
// situation // situation
if (!currentMaxLsn.isAvailable()) { if (!currentMaxLsn.isAvailable()) {
LOGGER.debug("No maximum LSN recorded in the database");
metronome.pause(); metronome.pause();
continue; continue;
} }
// There is no change in the database // There is no change in the database
if (currentMaxLsn.equals(lastProcessedLsn)) { if (currentMaxLsn.equals(lastProcessedLsn)) {
LOGGER.debug("No change in the database");
metronome.pause(); metronome.pause();
continue; continue;
} }
@ -104,6 +106,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
break; break;
} }
LOGGER.trace("Processing change {}", tableSmallestLsn);
final TableId tableId = tableSmallestLsn.getTableId(); final TableId tableId = tableSmallestLsn.getTableId();
final Lsn commitLsn = tableSmallestLsn.getCommitLsn(); final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
final Lsn rowLsn = tableSmallestLsn.getRowLsn(); final Lsn rowLsn = tableSmallestLsn.getRowLsn();

View File

@ -32,7 +32,7 @@
import io.debezium.util.Testing; import io.debezium.util.Testing;
/** /**
* Integration test to verify different Oracle datatypes. * Integration test to verify different SQL Server datatypes.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */

View File

@ -16,7 +16,7 @@
import io.debezium.util.Testing; import io.debezium.util.Testing;
/** /**
* Integration test to verify different Oracle datatypes. * Integration test to verify different SQL Server datatypes.
* The types are discovered during snapshotting phase. * The types are discovered during snapshotting phase.
* *
* @author Jiri Pechanec * @author Jiri Pechanec