DBZ-5085 Enforce read-consistency determining snapshot offsets

This commit is contained in:
Chris Cranford 2022-05-04 16:12:49 -04:00 committed by Jiri Pechanec
parent b61d99d0c6
commit ec940e8220
11 changed files with 596 additions and 124 deletions

View File

@ -5,7 +5,17 @@
*/
package io.debezium.connector.oracle;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.document.Document;
import io.debezium.relational.RelationalSnapshotChangeEventSource.RelationalSnapshotContext;
import io.debezium.relational.TableId;
/**
* Abstract implementation of the {@link StreamingAdapter} for which all streaming adapters are derived.
@ -14,6 +24,8 @@
*/
public abstract class AbstractStreamingAdapter implements StreamingAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamingAdapter.class);
protected final OracleConnectorConfig connectorConfig;
public AbstractStreamingAdapter(OracleConnectorConfig connectorConfig) {
@ -28,4 +40,79 @@ protected Scn resolveScn(Document document) {
}
return Scn.valueOf(scn);
}
/**
* Checks whether the two specified system change numbers have the same timestamp.
*
* @param scn1 first scn number, may be {@code null}
* @param scn2 second scn number, may be {@code null}
* @param connection the database connection, must not be {@code null}
* @return true if the two system change numbers have the same timestamp; false otherwise
* @throws SQLException if a database error occurred
*/
protected boolean areSameTimestamp(Scn scn1, Scn scn2, OracleConnection connection) throws SQLException {
if (scn1 == null) {
return false;
}
if (scn2 == null) {
return false;
}
final String query = "SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ")=SCN_TO_TIMESTAMP(" + scn2 + ")";
try (Statement s = connection.connection().createStatement(); ResultSet rs = s.executeQuery(query)) {
return rs.next();
}
}
/**
* Returns the SCN of the latest DDL change to the captured tables.
* The result will be empty if there is no table to capture as per the configuration.
*
* @param ctx the snapshot contest, must not be {@code null}
* @param connection the database connection, must not be {@code null}
* @return the latest table DDL system change number, never {@code null} but may be empty.
* @throws SQLException if a database error occurred
*/
protected Optional<Scn> getLatestTableDdlScn(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx, OracleConnection connection)
throws SQLException {
if (ctx.capturedTables.isEmpty()) {
return Optional.empty();
}
StringBuilder lastDdlScnQuery = new StringBuilder("SELECT TIMESTAMP_TO_SCN(MAX(last_ddl_time))")
.append(" FROM all_objects")
.append(" WHERE");
for (TableId table : ctx.capturedTables) {
lastDdlScnQuery.append(" (owner = '" + table.schema() + "' AND object_name = '" + table.table() + "') OR");
}
String query = lastDdlScnQuery.substring(0, lastDdlScnQuery.length() - 3).toString();
try (Statement statement = connection.connection().createStatement();
ResultSet rs = statement.executeQuery(query)) {
if (!rs.next()) {
throw new IllegalStateException("Couldn't get latest table DDL SCN");
}
// Guard against LAST_DDL_TIME with value of 0.
// This case should be treated as if we were unable to determine a value for LAST_DDL_TIME.
// This forces later calculations to be based upon the current SCN.
String latestDdlTime = rs.getString(1);
if ("0".equals(latestDdlTime)) {
return Optional.empty();
}
return Optional.of(Scn.valueOf(latestDdlTime));
}
catch (SQLException e) {
if (e.getErrorCode() == 8180) {
// DBZ-1446 In this use case we actually do not want to propagate the exception but
// rather return an empty optional value allowing the current SCN to take prior.
LOGGER.info("No latest table SCN could be resolved, defaulting to current SCN");
return Optional.empty();
}
throw e;
}
}
}

View File

@ -63,10 +63,15 @@ public class OracleConnection extends JdbcConnection {
private static final String QUOTED_CHARACTER = "\"";
public OracleConnection(JdbcConfiguration config, Supplier<ClassLoader> classLoaderSupplier) {
super(config, resolveConnectionFactory(config), classLoaderSupplier, QUOTED_CHARACTER, QUOTED_CHARACTER);
this(config, classLoaderSupplier, true);
}
public OracleConnection(JdbcConfiguration config, Supplier<ClassLoader> classLoaderSupplier, boolean showVersion) {
super(config, resolveConnectionFactory(config), classLoaderSupplier, QUOTED_CHARACTER, QUOTED_CHARACTER);
this.databaseVersion = resolveOracleDatabaseVersion();
LOGGER.info("Database Version: {}", databaseVersion.getBanner());
if (showVersion) {
LOGGER.info("Database Version: {}", databaseVersion.getBanner());
}
}
public void setSessionToPdb(String pdbName) {

View File

@ -470,6 +470,15 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withDescription(
"The maximum number of milliseconds that a LogMiner session lives for before being restarted. Defaults to 0 (indefinite until a log switch occurs)");
public static final Field LOG_MINING_QUERY_LOGS_FOR_SNAPSHOT_OFFSET = Field.createInternal("log.mining.query.logs.for.snapshot.offset")
.withDisplayName("Specifies whether to query transaction logs for snapshot offset position")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(true)
.withDescription(
"When set to true, the transaction logs will be inspected upon a new connector to resolve in-progress transactions. Setting to false disabled this behavior.");
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("Oracle")
.excluding(
@ -528,7 +537,8 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_LOG_QUERY_MAX_RETRIES,
LOG_MINING_LOG_BACKOFF_INITIAL_DELAY_MS,
LOG_MINING_LOG_BACKOFF_MAX_DELAY_MS,
LOG_MINING_SESSION_MAX_MS)
LOG_MINING_SESSION_MAX_MS,
LOG_MINING_QUERY_LOGS_FOR_SNAPSHOT_OFFSET)
.create();
/**
@ -585,6 +595,7 @@ public static ConfigDef configDef() {
private final Duration logMiningInitialDelay;
private final Duration logMiningMaxDelay;
private final Duration logMiningMaximumSession;
private final boolean logMiningQueryLogsForSnapshotOffset;
public OracleConnectorConfig(Configuration config) {
super(OracleConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(config),
@ -632,6 +643,7 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningInitialDelay = Duration.ofMillis(config.getLong(LOG_MINING_LOG_BACKOFF_INITIAL_DELAY_MS));
this.logMiningMaxDelay = Duration.ofMillis(config.getLong(LOG_MINING_LOG_BACKOFF_MAX_DELAY_MS));
this.logMiningMaximumSession = Duration.ofMillis(config.getLong(LOG_MINING_SESSION_MAX_MS));
this.logMiningQueryLogsForSnapshotOffset = config.getBoolean(LOG_MINING_QUERY_LOGS_FOR_SNAPSHOT_OFFSET);
}
private static String toUpperCase(String property) {
@ -1411,6 +1423,13 @@ public Optional<Duration> getLogMiningMaximumSession() {
return logMiningMaximumSession.toMillis() == 0L ? Optional.empty() : Optional.of(logMiningMaximumSession);
}
/**
* @return whether the transaction logs should be inspected for the snapshot offset
*/
public boolean isLogMiningQueryLogsForSnapshotOffset() {
return logMiningQueryLogsForSnapshotOffset;
}
@Override
public String getConnectorName() {
return Module.name();

View File

@ -144,7 +144,7 @@ public Builder snapshotScn(Scn scn) {
return this;
}
OracleOffsetContext build() {
public OracleOffsetContext build() {
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshotScn, snapshotPendingTransactions, snapshot, snapshotCompleted, transactionContext,
incrementalSnapshotContext);
}

View File

@ -5,13 +5,10 @@
*/
package io.debezium.connector.oracle;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -20,17 +17,14 @@
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
import io.debezium.util.HexConverter;
/**
* A {@link StreamingChangeEventSource} for Oracle.
@ -142,117 +136,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext<OraclePartition
return;
}
Optional<Scn> latestTableDdlScn = getLatestTableDdlScn(ctx);
Scn currentScn;
// we must use an SCN for taking the snapshot that represents a later timestamp than the latest DDL change than
// any of the captured tables; this will not be a problem in practice, but during testing it may happen that the
// SCN of "now" represents the same timestamp as a newly created table that should be captured; in that case
// we'd get a ORA-01466 when running the flashback query for doing the snapshot
do {
currentScn = jdbcConnection.getCurrentScn();
} while (areSameTimestamp(latestTableDdlScn.orElse(null), currentScn));
// Record the starting SCNs for all currently in-progress transactions.
// We should mine from the oldest still-reachable start SCN, but only for those transactions.
// For everything else, we mine only from the snapshot SCN forward.
Map<String, Scn> snapshotPendingTransactions = getSnapshotPendingTransactions(currentScn);
ctx.offset = OracleOffsetContext.create()
.logicalName(connectorConfig)
.scn(currentScn)
.snapshotScn(currentScn)
.snapshotPendingTransactions(snapshotPendingTransactions)
.transactionContext(new TransactionContext())
.incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext<>())
.build();
}
/**
* Whether the two SCNs represent the same timestamp or not (resolution is only 3 seconds).
*/
private boolean areSameTimestamp(Scn scn1, Scn scn2) throws SQLException {
if (scn1 == null) {
return false;
}
try (Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ") = SCN_TO_TIMESTAMP(" + scn2 + ")")) {
return rs.next();
}
}
/**
* Returns the SCN of the latest DDL change to the captured tables. The result will be empty if there's no table to
* capture as per the configuration.
*/
private Optional<Scn> getLatestTableDdlScn(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx)
throws SQLException {
if (ctx.capturedTables.isEmpty()) {
return Optional.empty();
}
StringBuilder lastDdlScnQuery = new StringBuilder("SELECT TIMESTAMP_TO_SCN(MAX(last_ddl_time))")
.append(" FROM all_objects")
.append(" WHERE");
for (TableId table : ctx.capturedTables) {
lastDdlScnQuery.append(" (owner = '" + table.schema() + "' AND object_name = '" + table.table() + "') OR");
}
String query = lastDdlScnQuery.substring(0, lastDdlScnQuery.length() - 3).toString();
try (Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery(query)) {
if (!rs.next()) {
throw new IllegalStateException("Couldn't get latest table DDL SCN");
}
// Guard against LAST_DDL_TIME with value of 0.
// This case should be treated as if we were unable to determine a value for LAST_DDL_TIME.
// This forces later calculations to be based upon the current SCN.
String latestDdlTime = rs.getString(1);
if ("0".equals(latestDdlTime)) {
return Optional.empty();
}
return Optional.of(Scn.valueOf(latestDdlTime));
}
catch (SQLException e) {
if (e.getErrorCode() == 8180) {
// DBZ-1446 In this use case we actually do not want to propagate the exception but
// rather return an empty optional value allowing the current SCN to take prior.
LOGGER.info("No latest table SCN could be resolved, defaulting to current SCN");
return Optional.empty();
}
throw e;
}
}
/**
* Returns a map of transaction id to start SCN for all ongoing transactions before snapshotSCN.
*/
private Map<String, Scn> getSnapshotPendingTransactions(final Scn snapshotSCN) throws SQLException {
StringBuilder txQuery = new StringBuilder("SELECT XID, START_SCN FROM V$TRANSACTION WHERE START_SCN < ");
txQuery.append(snapshotSCN.toString());
Map<String, Scn> transactions = new HashMap<>();
try (Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery(txQuery.toString())) {
while (rs.next()) {
byte[] txid = rs.getBytes(1);
String scn = rs.getString(2);
transactions.put(HexConverter.convertToHexString(txid), Scn.valueOf(scn));
}
}
catch (SQLException e) {
LOGGER.warn("Could not query the V$TRANSACTION view: {}", e);
throw e;
}
return transactions;
ctx.offset = connectorConfig.getAdapter().determineSnapshotOffset(ctx, connectorConfig, jdbcConnection);
}
@Override

View File

@ -5,11 +5,14 @@
*/
package io.debezium.connector.oracle;
import java.sql.SQLException;
import io.debezium.config.Configuration;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource.RelationalSnapshotContext;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
@ -70,4 +73,17 @@ StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSource(Oracl
default TableNameCaseSensitivity getTableNameCaseSensitivity(OracleConnection connection) {
return TableNameCaseSensitivity.SENSITIVE;
}
/**
* Returns the offset context based on the snapshot state.
*
* @param ctx the relational snapshot context, should never be {@code null}
* @param connectorConfig the connector configuration, should never be {@code null}
* @param connection the database connection, should never be {@code null}
* @return the offset context, never {@code null}
* @throws SQLException if a database error occurred
*/
OracleOffsetContext determineSnapshotOffset(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx,
OracleConnectorConfig connectorConfig, OracleConnection connection)
throws SQLException;
}

View File

@ -88,6 +88,10 @@ public Type getType() {
return type;
}
public boolean isScnInLogFileRange(Scn scn) {
return getFirstScn().compareTo(scn) <= 0 && (getNextScn().compareTo(scn) > 0 || getNextScn().equals(Scn.MAX));
}
@Override
public int hashCode() {
return Objects.hash(sequence);

View File

@ -5,6 +5,22 @@
*/
package io.debezium.connector.oracle.logminer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.AbstractStreamingAdapter;
import io.debezium.connector.oracle.OracleConnection;
@ -14,21 +30,29 @@
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.document.Document;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource.RelationalSnapshotContext;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
/**
* @author Chris Cranford
*/
public class LogMinerAdapter extends AbstractStreamingAdapter {
private static final String TYPE = "logminer";
private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerAdapter.class);
public static final String TYPE = "logminer";
public LogMinerAdapter(OracleConnectorConfig connectorConfig) {
super(connectorConfig);
@ -74,4 +98,257 @@ public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSourc
streamingMetrics);
}
@Override
public OracleOffsetContext determineSnapshotOffset(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx,
OracleConnectorConfig connectorConfig,
OracleConnection connection)
throws SQLException {
final Scn latestTableDdlScn = getLatestTableDdlScn(ctx, connection).orElse(null);
final Map<String, Scn> pendingTransactions = new LinkedHashMap<>();
final Optional<Scn> currentScn = getPendingTransactions(latestTableDdlScn, connection, pendingTransactions);
if (currentScn.isEmpty()) {
throw new DebeziumException("Failed to resolve current SCN");
}
// The supplied connection already has an in-progress transaction with a save point that will
// prevent us from switching from the PDB to the root CDB when PDB configuration is enabled.
// To resolve the correct in-progress transactions and starting SCN, a separate connection is
// required.
if (!Strings.isNullOrBlank(connectorConfig.getPdbName())) {
try (OracleConnection conn = new OracleConnection(connection.config(), () -> getClass().getClassLoader(), false)) {
conn.setAutoCommit(false);
// The next stage cannot be run within the PDB, reset the connection to the CDB.
conn.resetSessionToCdb();
return determineSnapshotOffset(connectorConfig, conn, currentScn.get(), pendingTransactions);
}
}
else {
return determineSnapshotOffset(connectorConfig, connection, currentScn.get(), pendingTransactions);
}
}
private Optional<Scn> getPendingTransactions(Scn latestTableDdlScn, OracleConnection connection,
Map<String, Scn> transactions)
throws SQLException {
final String query = "SELECT d.CURRENT_SCN, t.XID, t.START_SCN "
+ "FROM V$DATABASE d "
+ "LEFT OUTER JOIN V$TRANSACTION t "
+ "ON t.START_SCN < d.CURRENT_SCN ";
Scn currentScn = null;
do {
// Clear iterative state
currentScn = null;
transactions.clear();
try (Statement s = connection.connection().createStatement(); ResultSet rs = s.executeQuery(query)) {
while (rs.next()) {
if (currentScn == null) {
// Only need to set this once per iteration
currentScn = Scn.valueOf(rs.getString(1));
}
final String pendingTxStartScn = rs.getString(3);
if (!Strings.isNullOrEmpty(pendingTxStartScn)) {
// There is a pending transaction, capture state
transactions.put(HexConverter.convertToHexString(rs.getBytes(2)), Scn.valueOf(pendingTxStartScn));
}
}
}
catch (SQLException e) {
LOGGER.warn("Could not query the V$TRANSACTION view: {}", e.getMessage(), e);
throw e;
}
} while (areSameTimestamp(latestTableDdlScn, currentScn, connection));
return Optional.ofNullable(currentScn);
}
private OracleOffsetContext determineSnapshotOffset(OracleConnectorConfig connectorConfig,
OracleConnection connection,
Scn currentScn,
Map<String, Scn> pendingTransactions)
throws SQLException {
if (!connectorConfig.isLogMiningQueryLogsForSnapshotOffset()) {
LOGGER.info("\tSkipping transaction logs for resolving snapshot offset, only using V$TRANSACTION.");
}
else {
LOGGER.info("\tConsulting V$TRANSACTION and transaction logs for resolving snapshot offset.");
final Scn oldestScn = getOldestScnAvailableInLogs(connectorConfig, connection);
final List<LogFile> logFiles = getOrderedLogsFromScn(connectorConfig, oldestScn, connection);
// Simple sanity check
// While this should never be the case, this is to guard against NPE or other errors that could
// result from below if there is some race condition/corner case not considered
if (logFiles.isEmpty()) {
throw new DebeziumException("Failed to get log files from Oracle");
}
// Locate the index in the log files where we should begin
// This is the log where the current SCN exists
int logIndex = 0;
for (int i = 0; i < logFiles.size(); ++i) {
if (logFiles.get(i).isScnInLogFileRange(currentScn)) {
logIndex = i;
break;
}
}
// Starting from the log position (logIndex), we begin mining from it going backward.
// Each iteration will include the prior log along with the logs up to the logIndex to locate the start pos
for (int pos = logIndex; pos >= 0; pos--) {
try {
addLogsToSession(logFiles, pos, logIndex, connection);
startSession(connection);
final Optional<String> transactionId = getTransactionIdForScn(currentScn, connection);
if (transactionId.isEmpty()) {
throw new DebeziumException("Failed to get transaction id for current SCN " + currentScn);
}
if (pendingTransactions.containsKey(transactionId.get())) {
// The transaction was already captured in the pending transactions list.
// There is nothing special to do here, it is safe to end the session
LOGGER.info("\tCurrent SCN transaction '{}' was found in V$TRANSACTION", transactionId.get());
break;
}
// The current SCN transaction is not in the pending transaction list.
// We need to attempt to fully mine the transaction to see how to handle it.
Scn startScn = getTransactionStartScn(transactionId.get(), currentScn, connection);
if (startScn.isNull() && pos == 0) {
LOGGER.warn("Failed to find start SCN for transaction '{}', transaction will not be included.",
transactionId.get());
}
else {
pendingTransactions.put(transactionId.get(), startScn);
break;
}
}
finally {
stopSession(connection);
}
}
}
if (!pendingTransactions.isEmpty()) {
for (Map.Entry<String, Scn> entry : pendingTransactions.entrySet()) {
LOGGER.info("\tFound in-progress transaction {}, starting at SCN {}", entry.getKey(), entry.getValue());
}
}
else {
LOGGER.info("\tFound no in-progress transactions.");
}
return OracleOffsetContext.create()
.logicalName(connectorConfig)
.scn(currentScn)
.snapshotScn(currentScn)
.snapshotPendingTransactions(pendingTransactions)
.transactionContext(new TransactionContext())
.incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext<>())
.build();
}
private void addLogsToSession(List<LogFile> logs, int from, int to, OracleConnection connection) throws SQLException {
for (int i = from; i <= to; ++i) {
final LogFile logFile = logs.get(i);
LOGGER.debug("\tAdding log: {}", logFile.getFileName());
connection.executeWithoutCommitting(SqlUtils.addLogFileStatement("DBMS_LOGMNR.ADDFILE", logFile.getFileName()));
}
}
private void startSession(OracleConnection connection) throws SQLException {
// We explicitly use the ONLINE data dictionary mode here.
// Since we are only concerned about non-SQL columns, it is safe to always use this mode
final String query = "BEGIN sys.dbms_logmnr.start_logmnr("
+ "OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT);"
+ "END;";
LOGGER.debug("\tStarting mining session");
connection.executeWithoutCommitting(query);
}
private void stopSession(OracleConnection connection) throws SQLException {
// stop the current mining session
try {
LOGGER.debug("\tStopping mining session");
connection.executeWithoutCommitting("BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;");
}
catch (SQLException e) {
if (e.getMessage().toUpperCase().contains("ORA-01307")) {
LOGGER.debug("LogMiner mining session is already closed.");
}
else {
throw e;
}
}
}
private Scn getOldestScnAvailableInLogs(OracleConnectorConfig config, OracleConnection connection) throws SQLException {
final Duration archiveLogRetention = config.getLogMiningArchiveLogRetention();
final String archiveLogDestinationName = config.getLogMiningArchiveDestinationName();
return connection.queryAndMap(SqlUtils.oldestFirstChangeQuery(archiveLogRetention, archiveLogDestinationName),
rs -> {
if (rs.next()) {
final String value = rs.getString(1);
if (!Strings.isNullOrEmpty(value)) {
return Scn.valueOf(value);
}
}
return Scn.NULL;
});
}
private List<LogFile> getOrderedLogsFromScn(OracleConnectorConfig config, Scn sinceScn, OracleConnection connection) throws SQLException {
return LogMinerHelper.getLogFilesForOffsetScn(connection, sinceScn, config.getLogMiningArchiveLogRetention(),
config.isArchiveLogOnlyMode(), config.getLogMiningArchiveDestinationName())
.stream()
.sorted(Comparator.comparing(LogFile::getSequence))
.collect(Collectors.toList());
}
private Optional<String> getTransactionIdForScn(Scn scn, OracleConnection connection) throws SQLException {
LOGGER.debug("\tGet transaction id for SCN {}", scn);
final AtomicReference<String> transactionId = new AtomicReference<>();
connection.call("SELECT XID FROM V$LOGMNR_CONTENTS WHERE SCN = ?",
s -> s.setLong(1, scn.longValue()),
rs -> {
if (rs.next()) {
transactionId.set(HexConverter.convertToHexString(rs.getBytes("XID")));
}
});
return Optional.ofNullable(transactionId.get());
}
private Scn getTransactionStartScn(String transactionId, Scn currentScn, OracleConnection connection) throws SQLException {
LOGGER.debug("\tGet start SCN for transaction '{}'", transactionId);
// We perform this operation a maximum of 5 times before we fail.
final AtomicReference<Scn> startScn = new AtomicReference<>(Scn.NULL);
for (int attempt = 1; attempt <= 5; ++attempt) {
connection.call("SELECT SCN, START_SCN, OPERATION FROM V$LOGMNR_CONTENTS WHERE XID=HEXTORAW(UPPER(?))",
s -> s.setString(1, transactionId),
rs -> {
while (rs.next()) {
if (!Strings.isNullOrEmpty(rs.getString("START_SCN"))) {
final Scn value = Scn.valueOf(rs.getString("START_SCN"));
if (currentScn.compareTo(value) == 0) {
startScn.set(value.subtract(Scn.ONE));
LOGGER.debug("\tCurrent SCN {} starts a transaction, using value-1.", value);
break;
}
startScn.set(Scn.valueOf(rs.getString("START_SCN")));
LOGGER.debug("\tCurrent SCN transaction starts at SCN {}", value);
break;
}
}
});
if (!startScn.get().isNull()) {
break;
}
}
return startScn.get();
}
}

View File

@ -5,6 +5,13 @@
*/
package io.debezium.connector.oracle.xstream;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.AbstractStreamingAdapter;
import io.debezium.connector.oracle.OracleConnection;
@ -19,8 +26,11 @@
import io.debezium.document.Document;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalSnapshotChangeEventSource.RelationalSnapshotContext;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
@ -32,7 +42,9 @@
*/
public class XStreamAdapter extends AbstractStreamingAdapter {
private static final String TYPE = "xstream";
private static final Logger LOGGER = LoggerFactory.getLogger(XStreamAdapter.class);
public static final String TYPE = "xstream";
public XStreamAdapter(OracleConnectorConfig connectorConfig) {
super(connectorConfig);
@ -92,4 +104,33 @@ public TableNameCaseSensitivity getTableNameCaseSensitivity(OracleConnection con
}
return super.getTableNameCaseSensitivity(connection);
}
@Override
public OracleOffsetContext determineSnapshotOffset(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> ctx,
OracleConnectorConfig connectorConfig,
OracleConnection connection)
throws SQLException {
final Optional<Scn> latestTableDdlScn = getLatestTableDdlScn(ctx, connection);
// we must use an SCN for taking the snapshot that represents a later timestamp than the latest DDL change than
// any of the captured tables; this will not be a problem in practice, but during testing it may happen that the
// SCN of "now" represents the same timestamp as a newly created table that should be captured; in that case
// we'd get a ORA-01466 when running the flashback query for doing the snapshot
Scn currentScn = null;
do {
currentScn = connection.getCurrentScn();
} while (areSameTimestamp(latestTableDdlScn.orElse(null), currentScn, connection));
LOGGER.info("\tCurrent SCN resolved as {}", currentScn);
return OracleOffsetContext.create()
.logicalName(connectorConfig)
.scn(currentScn)
.snapshotScn(currentScn)
.snapshotPendingTransactions(Collections.emptyMap())
.transactionContext(new TransactionContext())
.incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext<>())
.build();
}
}

View File

@ -3275,6 +3275,144 @@ public void shouldCaptureChangesForTransactionsAcrossSnapshotBoundaryWithoutReem
}
}
@Test
@FixFor("DBZ-5085")
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to LogMiner")
public void shouldSnapshotAndStreamAllRecordsThatSpanAcrossSnapshotStreamingBoundarySmallTrxs() throws Exception {
TestHelper.dropTable(connection, "dbz5085");
try {
setConsumeTimeout(10, TimeUnit.SECONDS);
connection.execute("CREATE TABLE dbz5085 (id numeric(9,0) primary key, data varchar2(50))");
TestHelper.streamTable(connection, "dbz5085");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5085")
.with(OracleConnectorConfig.LOG_MINING_QUERY_LOGS_FOR_SNAPSHOT_OFFSET, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
final int expected = 50;
// Insert records into the table while the connector starts, part of the records should be
// captured during snapshot and streaming. We just need to guarantee that we get all records.
LOGGER.info("Inserting {} records", expected);
for (int i = 0; i < expected; ++i) {
if (i % 2 == 0) {
connection.execute("INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')");
}
else {
connection.executeWithoutCommitting("INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')");
}
// simulate longer lived transactions
Thread.sleep(100L);
}
connection.commit();
// wait until we get to streaming phase
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
SourceRecords sourceRecords = consumeRecordsByTopic(expected);
// verify that we got expected numbers of records
List<SourceRecord> records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5085");
assertThat(records).hasSize(expected);
boolean snapshotFound = false;
boolean streamingFound = false;
for (int i = 0; i < expected; ++i) {
final SourceRecord record = records.get(i);
final Struct value = (Struct) record.value();
if (value.getString("op").equals(Envelope.Operation.READ.code())) {
snapshotFound = true;
VerifyRecord.isValidRead(record, "ID", i);
}
else {
streamingFound = true;
VerifyRecord.isValidInsert(record, "ID", i);
}
}
// Verify that we got records from both phases
assertThat(snapshotFound).isTrue();
assertThat(streamingFound).isTrue();
}
finally {
TestHelper.dropTable(connection, "dbz5085");
}
}
@Test
@FixFor("DBZ-5085")
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applies to LogMiner")
public void shouldSnapshotAndStreamAllRecordsThatSpanAcrossSnapshotStreamingBoundaryLargeTrxs() throws Exception {
TestHelper.dropTable(connection, "dbz5085");
try {
setConsumeTimeout(10, TimeUnit.SECONDS);
connection.execute("CREATE TABLE dbz5085 (id numeric(9,0) primary key, data varchar2(50))");
TestHelper.streamTable(connection, "dbz5085");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5085")
.with(OracleConnectorConfig.LOG_MINING_QUERY_LOGS_FOR_SNAPSHOT_OFFSET, true)
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
final int expected = 50;
// Insert records into the table while the connector starts, part of the records should be
// captured during snapshot and streaming. We just need to guarantee that we get all records.
LOGGER.info("Inserting {} records", expected);
for (int i = 0; i < expected; ++i) {
if (i % 10 == 0) {
connection.execute("INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')");
}
else {
connection.executeWithoutCommitting("INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')");
}
// simulate longer lived transactions
Thread.sleep(100L);
}
connection.commit();
// wait until we get to streaming phase
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
SourceRecords sourceRecords = consumeRecordsByTopic(expected);
// verify that we got expected numbers of records
List<SourceRecord> records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5085");
assertThat(records).hasSize(expected);
boolean snapshotFound = false;
boolean streamingFound = false;
for (int i = 0; i < expected; ++i) {
final SourceRecord record = records.get(i);
final Struct value = (Struct) record.value();
if (value.getString("op").equals(Envelope.Operation.READ.code())) {
snapshotFound = true;
VerifyRecord.isValidRead(record, "ID", i);
}
else {
streamingFound = true;
VerifyRecord.isValidInsert(record, "ID", i);
}
}
// Verify that we got records from both phases
assertThat(snapshotFound).isTrue();
assertThat(streamingFound).isTrue();
}
finally {
TestHelper.dropTable(connection, "dbz5085");
}
}
@Test
@FixFor("DBZ-4842")
public void shouldRestartAfterCapturedTableIsDroppedWhileConnectorDown() throws Exception {

View File

@ -162,7 +162,8 @@ public static Configuration.Builder defaultConfig() {
return builder.with(OracleConnectorConfig.SERVER_NAME, SERVER_NAME)
.with(OracleConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(OracleConnectorConfig.LOG_MINING_QUERY_LOGS_FOR_SNAPSHOT_OFFSET, false);
}
/**