DBZ-947 Rename snapshot.locking.mode to snapshot.isolation.mode
This commit is contained in:
parent
b132f2b704
commit
0835a8efba
@ -109,9 +109,9 @@ public static SnapshotMode parse(String value, String defaultValue) {
|
||||
}
|
||||
|
||||
/**
|
||||
* The set of predefined Snapshot Locking Mode options.
|
||||
* The set of predefined Snapshot Isolation Mode options.
|
||||
*/
|
||||
public static enum SnapshotLockingMode implements EnumeratedValue {
|
||||
public static enum SnapshotIsolationMode implements EnumeratedValue {
|
||||
|
||||
/**
|
||||
* This mode will block all reads and writes for the entire duration of the snapshot.
|
||||
@ -136,7 +136,7 @@ public static enum SnapshotLockingMode implements EnumeratedValue {
|
||||
|
||||
private final String value;
|
||||
|
||||
private SnapshotLockingMode(String value) {
|
||||
private SnapshotIsolationMode(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@ -151,10 +151,10 @@ public String getValue() {
|
||||
* @param value the configuration property value; may not be null
|
||||
* @return the matching option, or null if no match is found
|
||||
*/
|
||||
public static SnapshotLockingMode parse(String value) {
|
||||
public static SnapshotIsolationMode parse(String value) {
|
||||
if (value == null) return null;
|
||||
value = value.trim();
|
||||
for (SnapshotLockingMode option : SnapshotLockingMode.values()) {
|
||||
for (SnapshotIsolationMode option : SnapshotIsolationMode.values()) {
|
||||
if (option.getValue().equalsIgnoreCase(value)) return option;
|
||||
}
|
||||
return null;
|
||||
@ -167,8 +167,8 @@ public static SnapshotLockingMode parse(String value) {
|
||||
* @param defaultValue the default value; may be null
|
||||
* @return the matching option, or null if no match is found and the non-null default is invalid
|
||||
*/
|
||||
public static SnapshotLockingMode parse(String value, String defaultValue) {
|
||||
SnapshotLockingMode mode = parse(value);
|
||||
public static SnapshotIsolationMode parse(String value, String defaultValue) {
|
||||
SnapshotIsolationMode mode = parse(value);
|
||||
if (mode == null && defaultValue != null) mode = parse(defaultValue);
|
||||
return mode;
|
||||
}
|
||||
@ -205,15 +205,16 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
|
||||
+ "'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; "
|
||||
+ "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. ");
|
||||
|
||||
public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode")
|
||||
.withDisplayName("Snapshot locking mode")
|
||||
.withEnum(SnapshotLockingMode.class, SnapshotLockingMode.REPEATABLE_READ)
|
||||
public static final Field SNAPSHOT_ISOLATION_MODE = Field.create("snapshot.isolation.mode")
|
||||
.withDisplayName("Snapshot isolation mode")
|
||||
.withEnum(SnapshotIsolationMode.class, SnapshotIsolationMode.REPEATABLE_READ)
|
||||
.withWidth(Width.SHORT)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription("Controls how long the connector locks the monitored tables for snapshot execution. The default is '" + SnapshotLockingMode.REPEATABLE_READ.getValue() + "', "
|
||||
.withDescription("Controls which transaction isolation level is used and how long the connector locks the monitored tables. "
|
||||
+ "The default is '" + SnapshotIsolationMode.REPEATABLE_READ.getValue() + "', which means that repeatable read isolation level is used. In addition, exclusive locks are taken only during schema snapshot. "
|
||||
+ "which means that the connector hold locks for all monitored tables only during schema snapshot."
|
||||
+ "Using a value of '" + SnapshotLockingMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exclusive lock (and thus prevents any reads and updates) for all monitored tables. "
|
||||
+ "When '" + SnapshotLockingMode.SNAPSHOT.getValue() + "' is specified, connector runs the initial snapshot in SNAPSHOT isolation level, which guarantees snapshot consistency. In addition, neither table nor row-level locks are held.");
|
||||
+ "Using a value of '" + SnapshotIsolationMode.EXCLUSIVE.getValue() + "' ensures that the connector holds the exclusive lock (and thus prevents any reads and updates) for all monitored tables during the entire snapshot duration. "
|
||||
+ "When '" + SnapshotIsolationMode.SNAPSHOT.getValue() + "' is specified, connector runs the initial snapshot in SNAPSHOT isolation level, which guarantees snapshot consistency. In addition, neither table nor row-level locks are held.");
|
||||
|
||||
/**
|
||||
* The set of {@link Field}s defined as part of this configuration.
|
||||
@ -255,7 +256,7 @@ public static ConfigDef configDef() {
|
||||
|
||||
private final String databaseName;
|
||||
private final SnapshotMode snapshotMode;
|
||||
private final SnapshotLockingMode snapshotLockingMode;
|
||||
private final SnapshotIsolationMode snapshotIsolationMode;
|
||||
private final Predicate<ColumnId> columnFilter;
|
||||
|
||||
public SqlServerConnectorConfig(Configuration config) {
|
||||
@ -263,7 +264,7 @@ public SqlServerConnectorConfig(Configuration config) {
|
||||
|
||||
this.databaseName = config.getString(DATABASE_NAME);
|
||||
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
|
||||
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
|
||||
this.snapshotIsolationMode = SnapshotIsolationMode.parse(config.getString(SNAPSHOT_ISOLATION_MODE), SNAPSHOT_ISOLATION_MODE.defaultValueAsString());
|
||||
this.columnFilter = Predicates.excludes(config.getString(RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST),
|
||||
columnId -> String.format("%s.%s.%s", columnId.schema(), columnId.table(), columnId.columnName()));
|
||||
}
|
||||
@ -272,8 +273,8 @@ public String getDatabaseName() {
|
||||
return databaseName;
|
||||
}
|
||||
|
||||
public SnapshotLockingMode getSnapshotLockingMode() {
|
||||
return this.snapshotLockingMode;
|
||||
public SnapshotIsolationMode getSnapshotIsolationMode() {
|
||||
return this.snapshotIsolationMode;
|
||||
}
|
||||
|
||||
public SnapshotMode getSnapshotMode() {
|
||||
|
@ -17,7 +17,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode;
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotIsolationMode;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
|
||||
import io.debezium.pipeline.spi.ChangeRecordEmitter;
|
||||
@ -79,7 +79,7 @@ protected SnapshotContext prepare(ChangeEventSourceContext context) throws Excep
|
||||
|
||||
@Override
|
||||
protected void connectionCreated(SnapshotContext snapshotContext) throws Exception {
|
||||
if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.SNAPSHOT) {
|
||||
if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.SNAPSHOT) {
|
||||
// Terminate any transaction in progress so we can change the isolation level
|
||||
jdbcConnection.connection().rollback();
|
||||
// With one exception, you can switch from one isolation level to another at any time during a transaction.
|
||||
@ -96,12 +96,12 @@ protected Set<TableId> getAllTableIds(SnapshotContext ctx) throws Exception {
|
||||
|
||||
@Override
|
||||
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
|
||||
if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.SNAPSHOT) {
|
||||
if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.SNAPSHOT) {
|
||||
// Snapshot transaction isolation level has already been set.
|
||||
LOGGER.info("Schema locking was disabled in connector configuration");
|
||||
}
|
||||
else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.EXCLUSIVE
|
||||
|| connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.REPEATABLE_READ) {
|
||||
else if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.EXCLUSIVE
|
||||
|| connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.REPEATABLE_READ) {
|
||||
jdbcConnection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
|
||||
((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
|
||||
|
||||
@ -128,7 +128,7 @@ else if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.EXCLUSI
|
||||
protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException {
|
||||
// Exclusive mode: locks should be kept until the end of transaction.
|
||||
// snapshot mode: no locks have been acquired.
|
||||
if (connectorConfig.getSnapshotLockingMode() == SnapshotLockingMode.REPEATABLE_READ) {
|
||||
if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.REPEATABLE_READ) {
|
||||
jdbcConnection.connection().rollback(((SqlServerSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint);
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@
|
||||
*/
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import static io.debezium.connector.sqlserver.SqlServerConnectorConfig.SNAPSHOT_LOCKING_MODE;
|
||||
import static io.debezium.connector.sqlserver.SqlServerConnectorConfig.SNAPSHOT_ISOLATION_MODE;
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
@ -27,7 +27,7 @@
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode;
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotIsolationMode;
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
|
||||
import io.debezium.connector.sqlserver.util.TestHelper;
|
||||
import io.debezium.data.SchemaAndValueField;
|
||||
@ -81,22 +81,22 @@ public void after() throws SQLException {
|
||||
|
||||
@Test
|
||||
public void takeSnapshotInExclusiveMode() throws Exception {
|
||||
takeSnapshot(SnapshotLockingMode.EXCLUSIVE);
|
||||
takeSnapshot(SnapshotIsolationMode.EXCLUSIVE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void takeSnapshotInSnapshotMode() throws Exception {
|
||||
takeSnapshot(SnapshotLockingMode.SNAPSHOT);
|
||||
takeSnapshot(SnapshotIsolationMode.SNAPSHOT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void takeSnapshotInRepeatableReadMode() throws Exception {
|
||||
takeSnapshot(SnapshotLockingMode.REPEATABLE_READ);
|
||||
takeSnapshot(SnapshotIsolationMode.REPEATABLE_READ);
|
||||
}
|
||||
|
||||
private void takeSnapshot(SnapshotLockingMode lockingMode) throws Exception {
|
||||
private void takeSnapshot(SnapshotIsolationMode lockingMode) throws Exception {
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SNAPSHOT_LOCKING_MODE.name(), lockingMode.getValue())
|
||||
.with(SNAPSHOT_ISOLATION_MODE.name(), lockingMode.getValue())
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
|
Loading…
Reference in New Issue
Block a user