DBZ-7302 Implement SnapshotLock and SnapshotQuery for Oracle connector

This commit is contained in:
mfvitale 2024-02-05 16:54:46 +01:00 committed by Jiri Pechanec
parent 9fe60a698d
commit 6392a1b816
9 changed files with 178 additions and 9 deletions

View File

@ -1037,7 +1037,12 @@ public enum SnapshotLockingMode implements EnumeratedValue {
* This mode will avoid using ANY table locks during the snapshot process.
* This mode should be used carefully only when no schema changes are to occur.
*/
NONE("none");
NONE("none"),
/**
* Inject a custom mode, which allows for more control over snapshot locking.
*/
CUSTOM("custom");
private final String value;

View File

@ -22,6 +22,7 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
import io.debezium.connector.oracle.snapshot.OracleSnapshotLockProvider;
import io.debezium.document.DocumentReader;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.jdbc.JdbcConfiguration;
@ -36,6 +37,7 @@
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
@ -206,6 +208,17 @@ public void doStop() {
}
}
@Override
protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
super.registerServiceProviders(serviceRegistry);
serviceRegistry.registerServiceProvider(new OracleSnapshotLockProvider());
/*
* serviceRegistry.registerServiceProvider(new OracleSnapshotQueryProvider());
* serviceRegistry.registerServiceProvider(new OracleSnapshotterServiceProvider());
*/
}
@Override
protected Iterable<Field> getAllConfigurationFields() {
return OracleConnectorConfig.ALL_FIELDS;

View File

@ -133,8 +133,11 @@ protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContex
throw new InterruptedException("Interrupted while locking table " + tableId);
}
LOGGER.debug("Locking table {}", tableId);
statement.execute("LOCK TABLE " + quote(tableId) + " IN ROW SHARE MODE");
Optional<String> lockingStatement = snapshotterService.getSnapshotLock().tableLockingStatement(null, Set.of(quote(tableId)));
if (lockingStatement.isPresent()) {
LOGGER.debug("Locking table {}", tableId);
statement.execute(lockingStatement.get());
}
}
}
}
@ -265,12 +268,8 @@ protected Instant getSnapshotSourceTimestamp(JdbcConnection jdbcConnection, Orac
@Override
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
TableId tableId, List<String> columns) {
final OracleOffsetContext offset = snapshotContext.offset;
final String snapshotOffset = offset.getScn().toString();
String snapshotSelectColumns = columns.stream()
.collect(Collectors.joining(", "));
assert snapshotOffset != null;
return Optional.of(String.format("SELECT %s FROM %s AS OF SCN %s", snapshotSelectColumns, quote(tableId), snapshotOffset));
return snapshotterService.getSnapshotQuery().snapshotQuery(quote(tableId), columns);
}
@Override

View File

@ -0,0 +1,30 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.snapshot;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.snapshot.SnapshotLockProvider;
import io.debezium.snapshot.spi.SnapshotLock;
/**
* An implementation of the {@link ServiceProvider} contract for the {@link SnapshotLock}.
*
* @author Mario Fiore Vitale
*/
public class OracleSnapshotLockProvider extends SnapshotLockProvider {
@Override
public String snapshotLockingMode(BeanRegistry beanRegistry) {
OracleConnectorConfig oracleConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, OracleConnectorConfig.class);
return oracleConnectorConfig.getSnapshotLockingMode().getValue();
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.snapshot.lock;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
public class NoSnapshotLock implements SnapshotLock {
@Override
public String name() {
return OracleConnectorConfig.SnapshotLockingMode.NONE.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> tableLockingStatement(Duration lockTimeout, Set<String> tableIds) {
return Optional.empty();
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.snapshot.lock;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
public class SharedSnapshotLock implements SnapshotLock {
@Override
public String name() {
return OracleConnectorConfig.SnapshotLockingMode.SHARED.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> tableLockingStatement(Duration lockTimeout, Set<String> tableIds) {
String tableId = tableIds.iterator().next(); // For Oracle ww expect just one table at time.
return Optional.of("LOCK TABLE " + tableId + " IN ROW SHARE MODE");
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.snapshot.query;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.snapshot.spi.SnapshotQuery;
public class SelectAllSnapshotQuery implements SnapshotQuery, BeanRegistryAware {
private BeanRegistry beanRegistry;
@Override
public String name() {
return CommonConnectorConfig.SnapshotQueryMode.SELECT_ALL.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public void injectBeanRegistry(BeanRegistry beanRegistry) {
this.beanRegistry = beanRegistry;
}
@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {
final Offsets<OraclePartition, OracleOffsetContext> mySqloffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OracleOffsetContext offset = mySqloffsets.getTheOnlyOffset();
final String snapshotOffset = offset.getScn().toString();
String columns = String.join(", ", snapshotSelectColumns);
assert snapshotOffset != null;
return Optional.of(String.format("SELECT %s FROM %s AS OF SCN %s", columns, tableId, snapshotOffset));
}
}

View File

@ -0,0 +1,2 @@
io.debezium.connector.oracle.snapshot.lock.NoSnapshotLock
io.debezium.connector.oracle.snapshot.lock.SharedSnapshotLock

View File

@ -0,0 +1 @@
io.debezium.connector.oracle.snapshot.query.SelectAllSnapshotQuery