DBZ-5245 Track commit scn per redo thread

This commit is contained in:
Chris Cranford 2022-06-28 17:04:33 -04:00 committed by Jiri Pechanec
parent 406e060df4
commit 30d95358b8
17 changed files with 638 additions and 88 deletions

View File

@ -0,0 +1,333 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
/**
* Represents either a single or a collection of commit {@link Scn} positions that collectively
* represents the high-watermark point for streaming changes.
*
* In a standalone Oracle environment, a commit {@link Scn} would normally represent a single position or
* system change number in the logs as there is only ever a single redo thread. However, in an Oracle RAC
* environment where each node maintains its own redo, there are multiple redo threads which maintain
* their own "commit" point in the logs that may differ.
*
* This class is meant to encapsulate the Oracle RAC environment by exposing a "commit scn" as a single
* representation that spans all nodes within the cluster as one logical unit, much like what we expect
* when integrating with a standalone Oracle database.
*
* @author Chris Cranford
*/
public class CommitScn implements Comparable<Scn> {
public static final String ROLLBACK_SEGMENT_ID_KEY = "rs_id";
public static final String SQL_SEQUENCE_NUMBER_KEY = "ssn";
public static final String REDO_THREAD_KEY = "redo_thread";
// Explicitly use TreeMap to guarantee output render order
private final Map<Integer, RedoThreadCommitScn> redoThreadCommitScns = new TreeMap<>();
private CommitScn(Set<RedoThreadCommitScn> commitScns) {
for (RedoThreadCommitScn commitScn : commitScns) {
redoThreadCommitScns.put(commitScn.getThread(), commitScn);
}
}
/**
* Examines all redo threads and returns the maximum committed scn.
*
* @return the maximum recorded commit across all redo threads
*/
public Scn getMaxCommittedScn() {
return redoThreadCommitScns.values().stream().map(RedoThreadCommitScn::getCommitScn).max(Scn::compareTo).orElse(Scn.NULL);
}
/**
* Get the commit scns associated with all redo threads.
*
* @return a map by redo thread with each commit system change number.
*/
public Map<Integer, Scn> getCommitScnForAllRedoThreads() {
final Map<Integer, Scn> result = new HashMap<>();
for (Map.Entry<Integer, RedoThreadCommitScn> entry : redoThreadCommitScns.entrySet()) {
result.put(entry.getKey(), entry.getValue().getCommitScn());
}
return Collections.unmodifiableMap(result);
}
/**
* Get the commit scn associated with a specific redo thread.
*
* @param thread the redo thread
* @return the commit scn associated with redo thread
*/
public Scn getCommitScnForRedoThread(int thread) {
final RedoThreadCommitScn commitScn = redoThreadCommitScns.get(thread);
return commitScn != null ? commitScn.getCommitScn() : Scn.NULL;
}
/**
* Checks whether the transaction associated with the commit event has been handled.
*
* @param row the transaction commit event, should never be {@code null}
* @return true if the commit has been handled, false if it has not
*/
public boolean hasCommitAlreadyBeenHandled(LogMinerEventRow row) {
final RedoThreadCommitScn commitScn = redoThreadCommitScns.get(row.getThread());
if (commitScn != null) {
return commitScn.getCommitScn().compareTo(row.getScn()) >= 0;
}
return false;
}
/**
* Records the specified commit in the commit scn
*
* @param row the commit event, should never be {@code null}
*/
public void recordCommit(LogMinerEventRow row) {
final RedoThreadCommitScn redoCommitScn = redoThreadCommitScns.get(row.getThread());
if (redoCommitScn != null) {
redoCommitScn.setCommitScn(row.getScn());
redoCommitScn.setRsId(row.getRsId());
redoCommitScn.setSsn(row.getSsn());
}
else {
redoThreadCommitScns.put(row.getThread(), new RedoThreadCommitScn(row));
}
}
/**
* Set the commit scn across all redo threads.
*
* @param commitScn the commit scn to be set, should not be {@code null}
*/
public void setCommitScnOnAllThreads(Scn commitScn) {
for (RedoThreadCommitScn redoCommitScn : redoThreadCommitScns.values()) {
redoCommitScn.setCommitScn(commitScn);
}
}
@Override
public int compareTo(Scn scn) {
if (redoThreadCommitScns.isEmpty()) {
return Scn.NULL.compareTo(scn);
}
int result = 1;
for (RedoThreadCommitScn commitScn : redoThreadCommitScns.values()) {
int check = commitScn.getCommitScn().compareTo(scn);
if (check < result) {
result = check;
}
}
return result;
}
/**
* Store the contents of the CommitScn in the connector offsets.
*
* @param offset the offsets, should not be {@code null}
* @return the adjusted offsets
*/
public Map<String, Object> store(Map<String, Object> offset) {
offset.put(SourceInfo.COMMIT_SCN_KEY, toCommaSeparatedValue());
return offset;
}
/**
* Store the contents of the CommitScn in the source info struct.
*
* @param sourceInfo the connector's source info data
* @param sourceInfoStruct the source info struct
* @return the adjusted source info struct
*/
public Struct store(SourceInfo sourceInfo, Struct sourceInfoStruct) {
if (sourceInfo.getRedoThread() != null) {
final RedoThreadCommitScn redoThreadCommitScn = redoThreadCommitScns.get(sourceInfo.getRedoThread());
if (redoThreadCommitScn != null) {
if (redoThreadCommitScn.getCommitScn() != null && !redoThreadCommitScn.getCommitScn().isNull()) {
sourceInfoStruct.put(SourceInfo.COMMIT_SCN_KEY, redoThreadCommitScn.getCommitScn().toString());
}
if (redoThreadCommitScn.getRsId() != null) {
sourceInfoStruct.put(ROLLBACK_SEGMENT_ID_KEY, redoThreadCommitScn.getRsId());
}
sourceInfoStruct.put(SQL_SEQUENCE_NUMBER_KEY, redoThreadCommitScn.getSsn());
sourceInfoStruct.put(REDO_THREAD_KEY, redoThreadCommitScn.getThread());
}
}
return sourceInfoStruct;
}
@Override
public String toString() {
return "CommitScn [redoThreadCommitScns=" + redoThreadCommitScns + "]";
}
/**
* Parses a string-based representation of commit scn entries as a CommitScn instance.
*
* @param value the commit scn entries, comma-separated
* @return the commit scn instance, never null
*/
public static CommitScn valueOf(String value) {
final Set<RedoThreadCommitScn> scns = new HashSet<>();
if (value != null) {
final String[] parts = value.split(",");
for (int i = 0; i < parts.length; ++i) {
final String part = parts[i];
scns.add(RedoThreadCommitScn.valueOf(part));
}
}
return new CommitScn(scns);
}
/**
* Parses a long-based representation of commit scn entries as a CommitScn instance.
*
* @param value the commit scn long value, should never be {@code null}
* @return the commit scn instance, never null
*/
public static CommitScn valueOf(Long value) {
final Set<RedoThreadCommitScn> scns = new HashSet<>();
if (value != null) {
scns.add(new RedoThreadCommitScn(1, Scn.valueOf(value), null, 0));
}
return new CommitScn(scns);
}
/**
* Load the CommitScn values from the offsets.
*
* @param offset the connector offsets, should not be {@code null}
* @return the commit scn instance, never {@code null}
*/
public static CommitScn load(Map<String, ?> offset) {
Object value = offset.get(SourceInfo.COMMIT_SCN_KEY);
if (value instanceof String) {
return CommitScn.valueOf((String) value);
}
else if (value != null) {
// This might be a legacy offset being read when the values were Long data types.
// In this case, we can assume that the redo thread is 1 and explicitly create a
// redo thread entry for it.
return CommitScn.valueOf((Long) value);
}
// return a commit scn instance with no redo thread data.
return new CommitScn(Collections.emptySet());
}
public static SchemaBuilder schemaBuilder(SchemaBuilder schemaBuilder) {
return schemaBuilder.field(ROLLBACK_SEGMENT_ID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SQL_SEQUENCE_NUMBER_KEY, Schema.OPTIONAL_INT32_SCHEMA)
.field(REDO_THREAD_KEY, Schema.OPTIONAL_INT32_SCHEMA);
}
/**
* Returns the commit scn as a comma-separated list of string values.
*/
private String toCommaSeparatedValue() {
if (!redoThreadCommitScns.isEmpty()) {
return redoThreadCommitScns.values().stream()
.map(RedoThreadCommitScn::getFormattedString)
.collect(Collectors.joining(","));
}
return null;
}
/**
* Represents a commit {@link Scn} for a specific redo thread.
*/
public static class RedoThreadCommitScn {
private final int thread;
private Scn commitScn;
private String rsId;
private int ssn;
public RedoThreadCommitScn(int thread) {
this(thread, Scn.NULL, null, 0);
}
public RedoThreadCommitScn(LogMinerEventRow row) {
this(row.getThread(), row.getScn(), row.getRsId(), row.getSsn());
}
public RedoThreadCommitScn(int thread, Scn commitScn, String rsId, int ssn) {
this.thread = thread;
this.commitScn = commitScn;
this.rsId = rsId;
this.ssn = ssn;
}
public int getThread() {
return thread;
}
public Scn getCommitScn() {
return commitScn;
}
public void setCommitScn(Scn commitScn) {
this.commitScn = commitScn;
}
public String getRsId() {
return rsId;
}
public void setRsId(String rsId) {
this.rsId = rsId;
}
public int getSsn() {
return ssn;
}
public void setSsn(int ssn) {
this.ssn = ssn;
}
public String getFormattedString() {
return commitScn.toString() + ":" + (rsId != null ? rsId : "") + ":" + ssn + ":" + thread;
}
public static RedoThreadCommitScn valueOf(String value) {
final String[] parts = value.split(":");
if (parts.length == 1) {
// Reading a legacy commit_scn entry that has only the SCN bit
// Create the redo thread entry with thread 1.
// There is only ever a single redo thread commit entry in this use case.
return new RedoThreadCommitScn(1, Scn.valueOf(parts[0]), null, 0);
}
else if (parts.length == 4) {
// The new redo-thread based commit scn entry
final Scn scn = Scn.valueOf(parts[0]);
final String rsId = parts[1];
final int ssn = Integer.parseInt(parts[2]);
final int thread = Integer.parseInt(parts[3]);
return new RedoThreadCommitScn(thread, scn, rsId, ssn);
}
throw new DebeziumException("An unexpected redo thread commit scn entry: '" + value + "'");
}
}
}

View File

@ -52,7 +52,7 @@ public class OracleOffsetContext extends CommonOffsetContext<SourceInfo> {
*/
private boolean snapshotCompleted;
public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Scn commitScn, String lcrPosition,
public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, CommitScn commitScn, String lcrPosition,
Scn snapshotScn, Map<String, Scn> snapshotPendingTransactions,
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
@ -70,6 +70,7 @@ public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Strin
// During streaming this value will be updated by the current event handler.
sourceInfo.setEventScn(scn);
sourceInfo.setLcrPosition(lcrPosition);
sourceInfo.setCommitScn(CommitScn.valueOf((String) null));
sourceInfoSchema = sourceInfo.schema();
// Snapshot SCN is a new field and may be null in cases where the offsets are being read from
@ -184,9 +185,8 @@ public static Builder create() {
}
else {
final Scn scn = sourceInfo.getScn();
final Scn commitScn = sourceInfo.getCommitScn();
offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : null);
offset.put(SourceInfo.COMMIT_SCN_KEY, commitScn != null ? commitScn.toString() : null);
sourceInfo.getCommitScn().store(offset);
}
if (snapshotPendingTransactions != null && !snapshotPendingTransactions.isEmpty()) {
String encoded = snapshotPendingTransactions.entrySet().stream()
@ -209,10 +209,6 @@ public void setScn(Scn scn) {
sourceInfo.setScn(scn);
}
public void setCommitScn(Scn commitScn) {
sourceInfo.setCommitScn(commitScn);
}
public void setEventScn(Scn eventScn) {
sourceInfo.setEventScn(eventScn);
}
@ -221,7 +217,7 @@ public Scn getScn() {
return sourceInfo.getScn();
}
public Scn getCommitScn() {
public CommitScn getCommitScn() {
return sourceInfo.getCommitScn();
}
@ -261,6 +257,14 @@ public void setTableId(TableId tableId) {
sourceInfo.tableEvent(tableId);
}
public Integer getRedoThread() {
return sourceInfo.getRedoThread();
}
public void setRedoThread(Integer redoThread) {
sourceInfo.setRedoThread(redoThread);
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && !snapshotCompleted;
@ -286,7 +290,7 @@ public String toString() {
sb.append(", snapshot_completed=").append(snapshotCompleted);
}
sb.append(", commit_scn=").append(sourceInfo.getCommitScn());
sb.append(", commit_scn=").append(sourceInfo.getCommitScn().toString());
sb.append("]");

View File

@ -6,6 +6,7 @@
package io.debezium.connector.oracle;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
@ -17,15 +18,16 @@ public class OracleSourceInfoStructMaker extends AbstractSourceInfoStructMaker<S
public OracleSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
final SchemaBuilder schemaBuilder = commonSchemaBuilder()
.name("io.debezium.connector.oracle.Source")
.field(SourceInfo.SCHEMA_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TXID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.EVENT_SCN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.COMMIT_SCN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
.field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA);
this.schema = CommitScn.schemaBuilder(schemaBuilder).build();
}
@Override
@ -35,7 +37,6 @@ public Schema schema() {
@Override
public Struct struct(SourceInfo sourceInfo) {
final String commitScn = sourceInfo.getCommitScn() == null ? null : sourceInfo.getCommitScn().toString();
final String eventScn = sourceInfo.getEventScn() == null ? null : sourceInfo.getEventScn().toString();
final Struct ret = super.commonStruct(sourceInfo)
@ -47,9 +48,12 @@ public Struct struct(SourceInfo sourceInfo) {
if (sourceInfo.getLcrPosition() != null) {
ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition());
}
final CommitScn commitScn = sourceInfo.getCommitScn();
if (commitScn != null) {
ret.put(SourceInfo.COMMIT_SCN_KEY, commitScn);
commitScn.store(sourceInfo, ret);
}
return ret;
}
}

View File

@ -26,12 +26,13 @@ public class SourceInfo extends BaseSourceInfo {
public static final String SNAPSHOT_KEY = "snapshot";
private Scn scn;
private Scn commitScn;
private CommitScn commitScn;
private Scn eventScn;
private String lcrPosition;
private String transactionId;
private Instant sourceTime;
private Set<TableId> tableIds;
private Integer redoThread;
protected SourceInfo(OracleConnectorConfig connectorConfig) {
super(connectorConfig);
@ -41,7 +42,7 @@ public Scn getScn() {
return scn;
}
public Scn getCommitScn() {
public CommitScn getCommitScn() {
return commitScn;
}
@ -53,7 +54,7 @@ public void setScn(Scn scn) {
this.scn = scn;
}
public void setCommitScn(Scn commitScn) {
public void setCommitScn(CommitScn commitScn) {
this.commitScn = commitScn;
}
@ -110,6 +111,14 @@ public void tableEvent(TableId tableId) {
this.tableIds = Collections.singleton(tableId);
}
public Integer getRedoThread() {
return redoThread;
}
public void setRedoThread(Integer redoThread) {
this.redoThread = redoThread;
}
@Override
protected Instant timestamp() {
return sourceTime;

View File

@ -7,6 +7,7 @@
import java.util.Map;
import io.debezium.connector.oracle.CommitScn;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.Scn;
@ -32,7 +33,7 @@ public OracleOffsetContext load(Map<String, ?> offset) {
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY));
Scn scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY);
Scn commitScn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.COMMIT_SCN_KEY);
CommitScn commitScn = CommitScn.load(offset);
Map<String, Scn> snapshotPendingTransactions = OracleOffsetContext.loadSnapshotPendingTransactions(offset);
Scn snapshotScn = OracleOffsetContext.loadSnapshotScn(offset);
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshotScn, snapshotPendingTransactions, snapshot, snapshotCompleted,

View File

@ -54,7 +54,7 @@ public class LogMinerQueryBuilder {
public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
final StringBuilder query = new StringBuilder(1024);
query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, ");
query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO ");
query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# ");
query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" ");
// These bind parameters will be bound when the query is executed by the caller.

View File

@ -269,11 +269,10 @@ else if (scn.compareTo(minScn) < 0) {
// Make sure the commit SCN is at least the snapshot SCN - 1.
// This ensures we'll never emit events for transactions that were complete before the snapshot was
// taken.
Scn originalCommitScn = offsetContext.getCommitScn();
if (originalCommitScn == null || originalCommitScn.compareTo(snapshotScn) < 0) {
if (offsetContext.getCommitScn().compareTo(snapshotScn) < 0) {
LOGGER.info("Setting commit SCN to {} (snapshot SCN - 1) to ensure we don't double-emit events from pre-snapshot transactions.",
snapshotScn.subtract(Scn.ONE));
offsetContext.setCommitScn(snapshotScn.subtract(Scn.ONE));
offsetContext.getCommitScn().setCommitScnOnAllThreads(snapshotScn.subtract(Scn.ONE));
}
// set start SCN to minScn

View File

@ -47,6 +47,8 @@ public class LogMinerEventRow {
private static final int RS_ID = 13;
private static final int STATUS = 14;
private static final int INFO = 15;
private static final int SSN = 16;
private static final int THREAD = 17;
private Scn scn;
private TableId tableId;
@ -63,6 +65,8 @@ public class LogMinerEventRow {
private String redoSql;
private int status;
private String info;
private int ssn;
private int thread;
public Scn getScn() {
return scn;
@ -124,6 +128,14 @@ public String getInfo() {
return info;
}
public int getSsn() {
return ssn;
}
public int getThread() {
return thread;
}
/**
* Returns a {@link LogMinerEventRow} instance based on the current row of the JDBC {@link ResultSet}.
*
@ -167,6 +179,8 @@ private void initializeFromResultSet(ResultSet resultSet, String catalogName, bo
this.redoSql = getSqlRedo(resultSet);
this.status = resultSet.getInt(STATUS);
this.info = resultSet.getString(INFO);
this.ssn = resultSet.getInt(SSN);
this.thread = resultSet.getInt(THREAD);
if (this.tableName != null) {
this.tableId = new TableId(catalogName, tablespaceName, tableName);
}
@ -234,6 +248,7 @@ public String toString() {
", rowId='" + rowId + '\'' +
", rollbackFlag=" + rollbackFlag +
", rsId=" + rsId +
", ssn=" + ssn +
", redoSql='" + redoSql + '\'' +
'}';
}

View File

@ -11,6 +11,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Supplier;
@ -74,9 +75,7 @@ public abstract class AbstractLogMinerEventProcessor<T extends AbstractTransacti
protected final Counters counters;
private Scn currentOffsetScn = Scn.NULL;
private Scn currentOffsetCommitScn = Scn.NULL;
private Scn lastCommittedScn = Scn.NULL;
private Scn maxCommittedScn = Scn.NULL;
private Map<Integer, Scn> currentOffsetCommitScns = new HashMap<>();
private Scn lastProcessedScn = Scn.NULL;
private boolean sequenceUnavailable = false;
@ -192,11 +191,11 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc
metrics.setLastCapturedDmlCount(counters.dmlCount);
if (counters.dmlCount > 0 || counters.commitCount > 0 || counters.rollbackCount > 0) {
warnPotentiallyStuckScn(currentOffsetScn, currentOffsetCommitScn);
warnPotentiallyStuckScn(currentOffsetScn, currentOffsetCommitScns);
currentOffsetScn = offsetContext.getScn();
if (offsetContext.getCommitScn() != null) {
currentOffsetCommitScn = offsetContext.getCommitScn();
currentOffsetCommitScns = offsetContext.getCommitScn().getCommitScnForAllRedoThreads();
}
}
@ -207,7 +206,7 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc
metrics.getMillisecondToSleepBetweenMiningQuery());
metrics.addProcessedRows(counters.rows);
return calculateNewStartScn(endScn, maxCommittedScn);
return calculateNewStartScn(endScn, offsetContext.getCommitScn().getMaxCommittedScn());
}
}
}
@ -346,7 +345,8 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr
metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
final Scn commitScn = row.getScn();
if (isTransactionAlreadyProcessed(commitScn, offsetContext.getCommitScn())) {
if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
final Scn lastCommittedScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
LOGGER.debug("Transaction {} has already been processed. "
+ "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.",
transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn);
@ -379,9 +379,10 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
offsetContext.setTransactionId(transactionId);
offsetContext.setSourceTime(event.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
offsetContext.setTableId(event.getTableId());
offsetContext.setRedoThread(row.getThread());
if (eventsProcessed == numEvents) {
// reached the last event update the commit scn in the offsets
offsetContext.setCommitScn(commitScn);
offsetContext.getCommitScn().recordCommit(row);
}
final DmlEvent dmlEvent = (DmlEvent) event;
@ -432,7 +433,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
}
}
lastCommittedScn = Scn.valueOf(commitScn.longValue());
offsetContext.setEventScn(commitScn);
if (getTransactionEventCount(transaction) > 0 && !skipExcludedUserName) {
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, transaction.getChangeTime());
@ -442,9 +442,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
}
metrics.calculateLagMetrics(row.getChangeTime());
if (lastCommittedScn.compareTo(maxCommittedScn) > 0) {
maxCommittedScn = lastCommittedScn;
}
finalizeTransactionCommit(transactionId, commitScn);
removeTransactionAndEventsFromCache(transaction);
@ -457,18 +454,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
}
/**
* Checks whether the transaction commit system change number has already been processed.
*
* @param commitScn the transaction's commit system change number, should not be {@code null}
* @param offsetCommitScn the current offsets commit system change number, should not be {@code null}
* @return true if the transaction has been seen based on the offsets, false otherwise
*/
protected boolean isTransactionAlreadyProcessed(Scn commitScn, Scn offsetCommitScn) {
return (offsetCommitScn != null && offsetCommitScn.compareTo(commitScn) >= 0)
|| lastCommittedScn.compareTo(commitScn) > 0;
}
/**
* Gets a transaction instance from the transaction cache while also removing its cache entry.
*
@ -559,9 +544,10 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
return;
}
final Scn commitScn = offsetContext.getCommitScn();
if (commitScn != null && commitScn.compareTo(row.getScn()) >= 0) {
LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN)", row.getRedoSql(), row.getScn(), commitScn);
if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
final Scn commitScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN for redo thread {})",
row.getRedoSql(), row.getScn(), commitScn, row.getThread());
return;
}
@ -590,10 +576,11 @@ else if (activeTransactions == 1) {
}
// Should always advance the commit SCN point with schema changes
LOGGER.debug("Schema change advanced offset commit SCN to {}", row.getScn());
offsetContext.setCommitScn(row.getScn());
LOGGER.debug("Schema change advanced offset commit SCN to {} for thread {}", row.getScn(), row.getThread());
offsetContext.getCommitScn().recordCommit(row);
offsetContext.setEventScn(row.getScn());
offsetContext.setRedoThread(row.getThread());
dispatcher.dispatchSchemaChangeEvent(partition,
tableId,
new OracleSchemaChangeEventEmitter(
@ -785,19 +772,19 @@ protected void handleDataEvent(LogMinerEventRow row) throws SQLException, Interr
* while the offset's {@code commit_scn} is changing between sessions.
*
* @param previousOffsetScn the previous offset system change number
* @param previousOffsetCommitScn the previous offset commit system change number
* @param previousOffsetCommitScns the previous offset commit system change number
*/
protected void warnPotentiallyStuckScn(Scn previousOffsetScn, Scn previousOffsetCommitScn) {
protected void warnPotentiallyStuckScn(Scn previousOffsetScn, Map<Integer, Scn> previousOffsetCommitScns) {
if (offsetContext != null && offsetContext.getCommitScn() != null) {
final Scn scn = offsetContext.getScn();
final Scn commitScn = offsetContext.getCommitScn();
if (previousOffsetScn.equals(scn) && !previousOffsetCommitScn.equals(commitScn)) {
final Map<Integer, Scn> commitScns = offsetContext.getCommitScn().getCommitScnForAllRedoThreads();
if (previousOffsetScn.equals(scn) && !previousOffsetCommitScns.equals(commitScns)) {
counters.stuckCount++;
if (counters.stuckCount == 25) {
LOGGER.warn("Offset SCN {} has not changed in 25 mining session iterations. " +
"This indicates long running transaction(s) are active. Commit SCN {}.",
"This indicates long running transaction(s) are active. Commit SCNs {}.",
previousOffsetScn,
previousOffsetCommitScn);
previousOffsetCommitScns);
metrics.incrementScnFreezeCount();
}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import static org.fest.assertions.Assertions.assertThat;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import io.debezium.doc.FixFor;
/**
* Test the basic functionality of the {@link CommitScn} model.
*
* @author Chris Cranford
*/
public class CommitScnTest {
@Test
@FixFor("DBZ-5245")
public void shouldParseCommitScnThatIsNull() throws Exception {
// Test null with String-based valueOf
CommitScn commitScn = CommitScn.valueOf((String) null);
assertThat(commitScn).isNotNull();
assertThat(commitScn.getCommitScnForAllRedoThreads()).isEmpty();
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.NULL);
assertThat(encodedCommitScn(commitScn)).isNull();
// Test null with Long-based valueOf
commitScn = CommitScn.valueOf((Long) null);
assertThat(commitScn).isNotNull();
assertThat(commitScn.getCommitScnForAllRedoThreads()).isEmpty();
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.NULL);
assertThat(encodedCommitScn(commitScn)).isNull();
}
@Test
@FixFor("DBZ-5245")
public void shouldParseCommitScnThatIsNumeric() throws Exception {
CommitScn commitScn = CommitScn.valueOf(12345L);
assertThat(commitScn).isNotNull();
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1);
assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1);
assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L));
assertThat(encodedCommitScn(commitScn)).isEqualTo("12345::0:1");
}
@Test
@FixFor("DBZ-5245")
public void shouldParseCommitScnThatIsString() throws Exception {
// Test parsing with only SCN value in the string
CommitScn commitScn = CommitScn.valueOf("12345");
assertThat(commitScn).isNotNull();
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1);
assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1);
assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L));
assertThat(encodedCommitScn(commitScn)).isEqualTo("12345::0:1");
// Test parsing with new multi-part SCN, single value
commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1");
assertThat(commitScn).isNotNull();
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1);
assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1);
assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L));
assertThat(encodedCommitScn(commitScn)).isEqualTo("12345:00241f.00093ff0.0010:0:1");
// Test parsing with new multi-part SCN, multi value
commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1,678901:1253ef.123457ee0.abcd:0:2");
assertThat(commitScn).isNotNull();
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(678901L));
assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(2);
assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1, 2);
assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForAllRedoThreads().get(2)).isEqualTo(Scn.valueOf(678901L));
assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L));
assertThat(commitScn.getCommitScnForRedoThread(2)).isEqualTo(Scn.valueOf(678901L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(678901L));
assertThat(encodedCommitScn(commitScn)).isEqualTo("12345:00241f.00093ff0.0010:0:1,678901:1253ef.123457ee0.abcd:0:2");
}
@Test
@FixFor("DBZ-5245")
public void shouldSetCommitScnAcrossAllRedoThreads() throws Exception {
// Test no redo thread data
CommitScn commitScn = CommitScn.valueOf((String) null);
commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.NULL);
assertThat(commitScn.getCommitScnForAllRedoThreads()).isEmpty();
// Test with a single commit scn
commitScn = CommitScn.valueOf("12345");
commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(23456L));
assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1);
assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(23456L));
// Test with a single redo record
commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1");
commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(23456L));
assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1);
assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(23456L));
// Test with a multi redo record
commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1,678901:1253ef.123457ee0.abcd:0:2");
commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L));
assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(23456L));
assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(2);
assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(23456L));
assertThat(commitScn.getCommitScnForRedoThread(2)).isEqualTo(Scn.valueOf(23456L));
}
@Test
@FixFor("DBZ-5245")
public void shouldBeComparableWithScn() throws Exception {
// Test no redo thread data
CommitScn commitScn = CommitScn.valueOf((String) null);
assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(0); // equal
assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(-1); // less-than
// Test with a single commit scn
commitScn = CommitScn.valueOf("12345");
assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(1); // greater than null
assertThat(commitScn.compareTo(Scn.valueOf(123L))).isEqualTo(1); // greater than 123
assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(0); // equal
assertThat(commitScn.compareTo(Scn.valueOf(23456L))).isEqualTo(-1); // less-than
// Test with a single redo record
commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1");
assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(1); // greater than null
assertThat(commitScn.compareTo(Scn.valueOf(123L))).isEqualTo(1); // greater than 123
assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(0); // equal
assertThat(commitScn.compareTo(Scn.valueOf(23456L))).isEqualTo(-1); // less-than
// Test with a multi redo record
commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1,345678:1253ef.123457ee0.abcd:0:2");
assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(1); // both greater than null
assertThat(commitScn.compareTo(Scn.valueOf(123L))).isEqualTo(1); // both greater than 123
assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(0); // thread 1 is equal to value
assertThat(commitScn.compareTo(Scn.valueOf(23456L))).isEqualTo(-1); // thread 1 is less than value
assertThat(commitScn.compareTo(Scn.valueOf(456789L))).isEqualTo(-1); // both less than 456789
}
private static String encodedCommitScn(CommitScn value) {
final Map<String, Object> offsets = value.store(new HashMap<>());
return (String) offsets.get(SourceInfo.COMMIT_SCN_KEY);
}
}

View File

@ -2012,12 +2012,12 @@ public void shouldUpdateCommitScnOnLobTransaction() throws Exception {
assertThat(after.get("DATA")).isNotNull();
Struct source = ((Struct) tableRecords.get(0).value()).getStruct("source");
assertThat(source.get("commit_scn")).isNotNull();
assertThat(source.get("scn")).isNotNull();
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
assertThat(source.get(SourceInfo.COMMIT_SCN_KEY)).isNotNull();
final long commitScn = Scn.valueOf(source.getString("commit_scn")).longValue();
final long scn = Scn.valueOf(source.getString("scn")).longValue();
assertThat(commitScn).isGreaterThanOrEqualTo(scn);
final String commitScn = source.getString(SourceInfo.COMMIT_SCN_KEY);
final String scn = source.getString(SourceInfo.SCN_KEY);
assertThat(Scn.valueOf(commitScn).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(scn).longValue());
}
finally {
TestHelper.dropTable(connection, "dbz5266");
@ -2054,12 +2054,12 @@ public void shouldUpdateCommitScnOnNonLobTransactionWithLobEnabled() throws Exce
assertThat(after.get("DATA")).isNotNull();
Struct source = ((Struct) tableRecords.get(0).value()).getStruct("source");
assertThat(source.get("commit_scn")).isNotNull();
assertThat(source.get("scn")).isNotNull();
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
assertThat(source.get(SourceInfo.COMMIT_SCN_KEY)).isNotNull();
final long commitScn = Scn.valueOf(source.getString("commit_scn")).longValue();
final long scn = Scn.valueOf(source.getString("scn")).longValue();
assertThat(commitScn).isGreaterThanOrEqualTo(scn);
final String commitScn = source.getString(SourceInfo.COMMIT_SCN_KEY);
final String scn = source.getString(SourceInfo.SCN_KEY);
assertThat(Scn.valueOf(commitScn).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(scn).longValue());
}
finally {
TestHelper.dropTable(connection, "dbz5266");
@ -2096,12 +2096,12 @@ public void shouldUpdateCommitScnOnNonLobTransactionWithLobDisabled() throws Exc
assertThat(after.get("DATA")).isNotNull();
Struct source = ((Struct) tableRecords.get(0).value()).getStruct("source");
assertThat(source.get("commit_scn")).isNotNull();
assertThat(source.get("scn")).isNotNull();
assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull();
assertThat(source.get(SourceInfo.COMMIT_SCN_KEY)).isNotNull();
final long commitScn = Scn.valueOf(source.getString("commit_scn")).longValue();
final long scn = Scn.valueOf(source.getString("scn")).longValue();
assertThat(commitScn).isGreaterThanOrEqualTo(scn);
final String commitScn = source.getString(SourceInfo.COMMIT_SCN_KEY);
final String scn = source.getString(SourceInfo.SCN_KEY);
assertThat(Scn.valueOf(commitScn).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(scn).longValue());
}
finally {
TestHelper.dropTable(connection, "dbz5266");

View File

@ -76,7 +76,7 @@ protected Offsets<Partition, OffsetContext> getOffsets() {
.with(RelationalDatabaseConnectorConfig.SERVER_NAME, TestHelper.SERVER_NAME)
.build();
final OracleOffsetContext position = new OracleOffsetContext(new OracleConnectorConfig(config), Scn.valueOf(999),
Scn.valueOf(999), null, Scn.valueOf(999), Collections.emptyMap(), false, true, new TransactionContext(),
CommitScn.valueOf(999L), null, Scn.valueOf(999), Collections.emptyMap(), false, true, new TransactionContext(),
new SignalBasedIncrementalSnapshotContext<>());
return Offsets.of(source, position);

View File

@ -42,8 +42,8 @@ public void beforeEach() throws Exception {
}
@Test
@FixFor("DBZ-2994")
public void shouldreadScnAndCommitScnAsLongValues() throws Exception {
@FixFor({ "DBZ-2994", "DBZ-5245" })
public void shouldReadScnAndCommitScnAsLongValues() throws Exception {
final Map<String, Object> offsetValues = new HashMap<>();
offsetValues.put(SourceInfo.SCN_KEY, 12345L);
offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L);
@ -51,12 +51,12 @@ public void shouldreadScnAndCommitScnAsLongValues() throws Exception {
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345"));
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456"));
assertThat(offsetContext.getCommitScn().getMaxCommittedScn()).isEqualTo(Scn.valueOf("23456"));
}
}
@Test
@FixFor("DBZ-2994")
@FixFor({ "DBZ-2994", "DBZ-5245" })
public void shouldReadScnAndCommitScnAsStringValues() throws Exception {
final Map<String, Object> offsetValues = new HashMap<>();
offsetValues.put(SourceInfo.SCN_KEY, "12345");
@ -65,12 +65,12 @@ public void shouldReadScnAndCommitScnAsStringValues() throws Exception {
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345"));
if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) {
assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456"));
assertThat(offsetContext.getCommitScn().getMaxCommittedScn()).isEqualTo(Scn.valueOf("23456"));
}
}
@Test
@FixFor("DBZ-2994")
@FixFor({ "DBZ-2994", "DBZ-5245" })
public void shouldHandleNullScnAndCommitScnValues() throws Exception {
final Map<String, Object> offsetValues = new HashMap<>();
offsetValues.put(SourceInfo.SCN_KEY, null);
@ -78,11 +78,11 @@ public void shouldHandleNullScnAndCommitScnValues() throws Exception {
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
assertThat(offsetContext.getScn()).isNull();
assertThat(offsetContext.getCommitScn()).isNull();
assertThat(offsetContext.getCommitScn().getMaxCommittedScn()).isEqualTo(Scn.NULL);
}
@Test
@FixFor("DBZ-4937")
@FixFor({ "DBZ-4937", "DBZ-5245" })
public void shouldCorrectlySerializeOffsetsWithSnapshotBasedKeysFromOlderOffsets() throws Exception {
// Offsets from Debezium 1.8
final Map<String, Object> offsetValues = new HashMap<>();
@ -95,7 +95,7 @@ public void shouldCorrectlySerializeOffsetsWithSnapshotBasedKeysFromOlderOffsets
// Write values out as Debezium 1.9
Map<String, ?> writeValues = offsetContext.getOffset();
assertThat(writeValues.get(SourceInfo.SCN_KEY)).isEqualTo("745688898023");
assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024");
assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024::0:1");
assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_PENDING_TRANSACTIONS_KEY)).isNull();
assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_SCN_KEY)).isNull();
@ -105,7 +105,7 @@ public void shouldCorrectlySerializeOffsetsWithSnapshotBasedKeysFromOlderOffsets
// Write values out as Debezium 1.9
writeValues = offsetContext.getOffset();
assertThat(writeValues.get(SourceInfo.SCN_KEY)).isEqualTo("745688898023");
assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024");
assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024::0:1");
assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_PENDING_TRANSACTIONS_KEY)).isNull();
assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_SCN_KEY)).isNull();
}

View File

@ -62,6 +62,9 @@ public void schemaIsCorrect() {
.field("scn", Schema.OPTIONAL_STRING_SCHEMA)
.field("commit_scn", Schema.OPTIONAL_STRING_SCHEMA)
.field("lcr_position", Schema.OPTIONAL_STRING_SCHEMA)
.field("rs_id", Schema.OPTIONAL_STRING_SCHEMA)
.field("ssn", Schema.OPTIONAL_INT32_SCHEMA)
.field("redo_thread", Schema.OPTIONAL_INT32_SCHEMA)
.build();
VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema);

View File

@ -61,7 +61,7 @@ public class LogMinerQueryBuilderTest {
* {@code database.history.store.only.captured.tables.ddl} is {@code false}.
*/
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " +
"FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " +
"${systemTablePredicate}" +
"AND ((" +
@ -82,7 +82,7 @@ public class LogMinerQueryBuilderTest {
* {@code database.history.store.only.captured.tables.ddl} is {@code true}.
*/
private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO " +
"XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " +
"FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " +
"${systemTablePredicate}" +
"AND ((" +

View File

@ -27,6 +27,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.CommitScn;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
@ -83,6 +84,8 @@ public void before() throws Exception {
this.dispatcher = (EventDispatcher<OraclePartition, TableId>) Mockito.mock(EventDispatcher.class);
this.partition = Mockito.mock(OraclePartition.class);
this.offsetContext = Mockito.mock(OracleOffsetContext.class);
final CommitScn commitScn = CommitScn.valueOf((String) null);
Mockito.when(this.offsetContext.getCommitScn()).thenReturn(commitScn);
this.connection = createOracleConnection();
this.schema = createOracleDatabaseSchema();
this.metrics = createMetrics(schema);

View File

@ -252,8 +252,11 @@ The message contains a logical representation of the table schema.
"table": "CUSTOMERS",
"txId" : null,
"scn" : "1513734",
"commit_scn": "1513734",
"lcr_position" : null
"commit_scn": "1513754",
"lcr_position" : null,
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1
},
"ts_ms": 1588252618953, // <1>
"databaseName": "ORCLPDB1", // <2>
@ -901,6 +904,21 @@ The following example shows the value of a _create_ event value from the `custom
"optional": true,
"field": "commit_scn"
},
{
"type": "string",
"optional": true,
"field": "rs_id"
},
{
"type": "int32",
"optional": true,
"field": "ssn"
},
{
"type": "int32",
"optional": true,
"field": "redo_thread"
},
{
"type": "boolean",
"optional": true,
@ -940,6 +958,9 @@ The following example shows the value of a _create_ event value from the `custom
"txId": "6.28.807",
"scn": "2122185",
"commit_scn": "2122185",
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"snapshot": false
},
"op": "c",
@ -1000,6 +1021,9 @@ The following example shows an _update_ change event that the connector captures
"txId": "6.9.809",
"scn": "2125544",
"commit_scn": "2125544",
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"snapshot": false
},
"op": "u",
@ -1058,6 +1082,9 @@ The `schema` portion of the _delete_ event is identical to the `schema` portion
"txId": "6.28.807",
"scn": "2122184",
"commit_scn": "2122184",
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1,
"snapshot": false
},
"op": "d",
@ -1113,7 +1140,10 @@ The message key is `null` in this case, the message value looks like this:
"txId": "02000a0037030000",
"scn": "13234397",
"commit_scn": "13271102",
"lcr_position": null
"lcr_position": null,
"rs_id": "001234.00012345.0124",
"ssn": 1,
"redo_thread": 1
},
"op": "t", // <2>
"ts_ms": 1638974558961, // <3>