DBZ-947 Fix semantics of snapshot locking modes

Exclusive mode: this mode takes exclusive locks on all monitored tables
during initial load (as documented).

None mode has been renamed to repeatable read since this name better
reflects its characteristics. None suggests that no locks are acquired.
No table locks are acquired indeed, however, repeatable read takes
shared row locks until the end of the transaction, which effectively
prevents other transactions from updating tables.
Table locks are hold only during schema snapshot.
This commit is contained in:
Grzegorz Kołakowski 2019-01-24 20:14:06 +01:00 committed by Gunnar Morling
parent 3bd97d3bd7
commit b132f2b704
3 changed files with 23 additions and 19 deletions

View File

@ -127,10 +127,12 @@ public static enum SnapshotLockingMode implements EnumeratedValue {
SNAPSHOT("snapshot"), SNAPSHOT("snapshot"),
/** /**
* This mode will avoid using ANY table locks during the snapshot process. This mode can only be used with SnapShotMode * This mode uses REPEATABLE READ isolation level. This mode will avoid taking any table
* set to schema_only or schema_only_recovery. * locks during the snapshot process, except schema snapshot phase where exclusive table
* locks are acquired for a short period. Since phantom reads can occur, it does not fully
* guarantee consistency.
*/ */
NONE("none"); REPEATABLE_READ("repeatable_read");
private final String value; private final String value;
@ -205,12 +207,13 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode") public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode")
.withDisplayName("Snapshot locking mode") .withDisplayName("Snapshot locking mode")
.withEnum(SnapshotLockingMode.class, SnapshotLockingMode.NONE) .withEnum(SnapshotLockingMode.class, SnapshotLockingMode.REPEATABLE_READ)
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
.withImportance(Importance.LOW) .withImportance(Importance.LOW)
.withDescription("Controls how long the connector locks the montiored tables for snapshot execution. The default is '" + SnapshotLockingMode.NONE.getValue() + "', " .withDescription("Controls how long the connector locks the monitored tables for snapshot execution. The default is '" + SnapshotLockingMode.REPEATABLE_READ.getValue() + "', "
+ "which means that the connector does not hold any locks for all monitored tables." + "which means that the connector hold locks for all monitored tables only during schema snapshot."
+ "Using a value of '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exlusive lock (and thus prevents any reads and updates) for all monitored tables."); + "Using a value of '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exclusive lock (and thus prevents any reads and updates) for all monitored tables. "
+ "When '" + SnapshotLockingMode.SNAPSHOT.getValue() + "' is specified, connector runs the initial snapshot in SNAPSHOT isolation level, which guarantees snapshot consistency. In addition, neither table nor row-level locks are held.");
/** /**
* The set of {@link Field}s defined as part of this configuration. * The set of {@link Field}s defined as part of this configuration.

View File

@ -96,16 +96,16 @@ protected Set<TableId> getAllTableIds(SnapshotContext ctx) throws Exception {
@Override @Override
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.NONE) { if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.SNAPSHOT) {
jdbcConnection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); // Snapshot transaction isolation level has already been set.
((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
LOGGER.info("Schema locking was disabled in connector configuration"); LOGGER.info("Schema locking was disabled in connector configuration");
} }
else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.EXCLUSIVE) { else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.EXCLUSIVE
LOGGER.info("Executing schema locking"); || connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.REPEATABLE_READ) {
jdbcConnection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot"); ((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
LOGGER.info("Executing schema locking");
try (Statement statement = jdbcConnection.connection().createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { try (Statement statement = jdbcConnection.connection().createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
for (TableId tableId : snapshotContext.capturedTables) { for (TableId tableId : snapshotContext.capturedTables) {
if (!sourceContext.isRunning()) { if (!sourceContext.isRunning()) {
@ -119,9 +119,6 @@ else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.EXCLUSI
} }
} }
} }
else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.SNAPSHOT) {
((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
}
else { else {
throw new IllegalStateException("Unknown locking mode specified."); throw new IllegalStateException("Unknown locking mode specified.");
} }
@ -129,7 +126,11 @@ else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.SNAPSHO
@Override @Override
protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException { protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException {
jdbcConnection.connection().rollback(((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint); // Exclusive mode: locks should be kept until the end of transaction.
// snapshot mode: no locks have been acquired.
if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.REPEATABLE_READ) {
jdbcConnection.connection().rollback(((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint);
}
} }
@Override @Override

View File

@ -90,8 +90,8 @@ public void takeSnapshotInSnapshotMode() throws Exception {
} }
@Test @Test
public void takeSnapshotInNoneMode() throws Exception { public void takeSnapshotInRepeatableReadMode() throws Exception {
takeSnapshot(SnapshotLockingMode.NONE); takeSnapshot(SnapshotLockingMode.REPEATABLE_READ);
} }
private void takeSnapshot(SnapshotLockingMode lockingMode) throws Exception { private void takeSnapshot(SnapshotLockingMode lockingMode) throws Exception {