From 0835a8efbaf34eb21babf9303de7007e6c9a7b7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Ko=C5=82akowski?= Date: Thu, 24 Jan 2019 20:26:34 +0100 Subject: [PATCH] DBZ-947 Rename snapshot.locking.mode to snapshot.isolation.mode --- .../sqlserver/SqlServerConnectorConfig.java | 35 ++++++++++--------- .../SqlServerSnapshotChangeEventSource.java | 12 +++---- .../connector/sqlserver/SnapshotIT.java | 14 ++++---- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 524857579..d59569c65 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -109,9 +109,9 @@ public static SnapshotMode parse(String value, String defaultValue) { } /** - * The set of predefined Snapshot Locking Mode options. + * The set of predefined Snapshot Isolation Mode options. */ - public static enum SnapshotLockingMode implements EnumeratedValue { + public static enum SnapshotIsolationMode implements EnumeratedValue { /** * This mode will block all reads and writes for the entire duration of the snapshot. @@ -136,7 +136,7 @@ public static enum SnapshotLockingMode implements EnumeratedValue { private final String value; - private SnapshotLockingMode(String value) { + private SnapshotIsolationMode(String value) { this.value = value; } @@ -151,10 +151,10 @@ public String getValue() { * @param value the configuration property value; may not be null * @return the matching option, or null if no match is found */ - public static SnapshotLockingMode parse(String value) { + public static SnapshotIsolationMode parse(String value) { if (value == null) return null; value = value.trim(); - for (SnapshotLockingMode option : SnapshotLockingMode.values()) { + for (SnapshotIsolationMode option : SnapshotIsolationMode.values()) { if (option.getValue().equalsIgnoreCase(value)) return option; } return null; @@ -167,8 +167,8 @@ public static SnapshotLockingMode parse(String value) { * @param defaultValue the default value; may be null * @return the matching option, or null if no match is found and the non-null default is invalid */ - public static SnapshotLockingMode parse(String value, String defaultValue) { - SnapshotLockingMode mode = parse(value); + public static SnapshotIsolationMode parse(String value, String defaultValue) { + SnapshotIsolationMode mode = parse(value); if (mode == null && defaultValue != null) mode = parse(defaultValue); return mode; } @@ -205,15 +205,16 @@ public static SnapshotLockingMode parse(String value, String defaultValue) { + "'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; " + "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. "); - public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode") - .withDisplayName("Snapshot locking mode") - .withEnum(SnapshotLockingMode.class, SnapshotLockingMode.REPEATABLE_READ) + public static final Field SNAPSHOT_ISOLATION_MODE = Field.create("snapshot.isolation.mode") + .withDisplayName("Snapshot isolation mode") + .withEnum(SnapshotIsolationMode.class, SnapshotIsolationMode.REPEATABLE_READ) .withWidth(Width.SHORT) .withImportance(Importance.LOW) - .withDescription("Controls how long the connector locks the monitored tables for snapshot execution. The default is '" + SnapshotLockingMode.REPEATABLE_READ.getValue() + "', " + .withDescription("Controls which transaction isolation level is used and how long the connector locks the monitored tables. " + + "The default is '" + SnapshotIsolationMode.REPEATABLE_READ.getValue() + "', which means that repeatable read isolation level is used. In addition, exclusive locks are taken only during schema snapshot. " + "which means that the connector hold locks for all monitored tables only during schema snapshot." - + "Using a value of '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exclusive lock (and thus prevents any reads and updates) for all monitored tables. " - + "When '" + SnapshotLockingMode.SNAPSHOT.getValue() + "' is specified, connector runs the initial snapshot in SNAPSHOT isolation level, which guarantees snapshot consistency. In addition, neither table nor row-level locks are held."); + + "Using a value of '" + SnapshotIsolationMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exclusive lock (and thus prevents any reads and updates) for all monitored tables during the entire snapshot duration. " + + "When '" + SnapshotIsolationMode.SNAPSHOT.getValue() + "' is specified, connector runs the initial snapshot in SNAPSHOT isolation level, which guarantees snapshot consistency. In addition, neither table nor row-level locks are held."); /** * The set of {@link Field}s defined as part of this configuration. @@ -255,7 +256,7 @@ public static ConfigDef configDef() { private final String databaseName; private final SnapshotMode snapshotMode; - private final SnapshotLockingMode snapshotLockingMode; + private final SnapshotIsolationMode snapshotIsolationMode; private final Predicate columnFilter; public SqlServerConnectorConfig(Configuration config) { @@ -263,7 +264,7 @@ public SqlServerConnectorConfig(Configuration config) { this.databaseName = config.getString(DATABASE_NAME); this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); - this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + this.snapshotIsolationMode = SnapshotIsolationMode.parse(config.getString(SNAPSHOT_ISOLATION_MODE), SNAPSHOT_ISOLATION_MODE.defaultValueAsString()); this.columnFilter = Predicates.excludes(config.getString(RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST), columnId -> String.format("%s.%s.%s", columnId.schema(), columnId.table(), columnId.columnName())); } @@ -272,8 +273,8 @@ public String getDatabaseName() { return databaseName; } - public SnapshotLockingMode getSnapshotLockingMode() { - return this.snapshotLockingMode; + public SnapshotIsolationMode getSnapshotIsolationMode() { + return this.snapshotIsolationMode; } public SnapshotMode getSnapshotMode() { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index 2ae23a402..fc72aef9e 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -17,7 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotIsolationMode; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.ChangeRecordEmitter; @@ -79,7 +79,7 @@ protected SnapshotContext prepare(ChangeEventSourceContext context) throws Excep @Override protected void connectionCreated(SnapshotContext snapshotContext) throws Exception { - if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.SNAPSHOT) { + if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.SNAPSHOT) { // Terminate any transaction in progress so we can change the isolation level jdbcConnection.connection().rollback(); // With one exception, you can switch from one isolation level to another at any time during a transaction. @@ -96,12 +96,12 @@ protected Set getAllTableIds(SnapshotContext ctx) throws Exception { @Override protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { - if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.SNAPSHOT) { + if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.SNAPSHOT) { // Snapshot transaction isolation level has already been set. LOGGER.info("Schema locking was disabled in connector configuration"); } - else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.EXCLUSIVE - || connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.REPEATABLE_READ) { + else if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.EXCLUSIVE + || connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.REPEATABLE_READ) { jdbcConnection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); ((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot"); @@ -128,7 +128,7 @@ else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.EXCLUSI protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException { // Exclusive mode: locks should be kept until the end of transaction. // snapshot mode: no locks have been acquired. - if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.REPEATABLE_READ) { + if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.REPEATABLE_READ) { jdbcConnection.connection().rollback(((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint); } } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index 965007070..5342f37f7 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -5,7 +5,7 @@ */ package io.debezium.connector.sqlserver; -import static io.debezium.connector.sqlserver.SqlServerConnectorConfig.SNAPSHOT_LOCKING_MODE; +import static io.debezium.connector.sqlserver.SqlServerConnectorConfig.SNAPSHOT_ISOLATION_MODE; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertNull; @@ -27,7 +27,7 @@ import org.junit.Test; import io.debezium.config.Configuration; -import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotIsolationMode; import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.util.TestHelper; import io.debezium.data.SchemaAndValueField; @@ -81,22 +81,22 @@ public void after() throws SQLException { @Test public void takeSnapshotInExclusiveMode() throws Exception { - takeSnapshot(SnapshotLockingMode.EXCLUSIVE); + takeSnapshot(SnapshotIsolationMode.EXCLUSIVE); } @Test public void takeSnapshotInSnapshotMode() throws Exception { - takeSnapshot(SnapshotLockingMode.SNAPSHOT); + takeSnapshot(SnapshotIsolationMode.SNAPSHOT); } @Test public void takeSnapshotInRepeatableReadMode() throws Exception { - takeSnapshot(SnapshotLockingMode.REPEATABLE_READ); + takeSnapshot(SnapshotIsolationMode.REPEATABLE_READ); } - private void takeSnapshot(SnapshotLockingMode lockingMode) throws Exception { + private void takeSnapshot(SnapshotIsolationMode lockingMode) throws Exception { final Configuration config = TestHelper.defaultConfig() - .with(SNAPSHOT_LOCKING_MODE.name(), lockingMode.getValue()) + .with(SNAPSHOT_ISOLATION_MODE.name(), lockingMode.getValue()) .build(); start(SqlServerConnector.class, config);