diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/CommitScn.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/CommitScn.java new file mode 100644 index 000000000..7c355d907 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/CommitScn.java @@ -0,0 +1,333 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +import io.debezium.DebeziumException; +import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; + +/** + * Represents either a single or a collection of commit {@link Scn} positions that collectively + * represents the high-watermark point for streaming changes. + * + * In a standalone Oracle environment, a commit {@link Scn} would normally represent a single position or + * system change number in the logs as there is only ever a single redo thread. However, in an Oracle RAC + * environment where each node maintains its own redo, there are multiple redo threads which maintain + * their own "commit" point in the logs that may differ. + * + * This class is meant to encapsulate the Oracle RAC environment by exposing a "commit scn" as a single + * representation that spans all nodes within the cluster as one logical unit, much like what we expect + * when integrating with a standalone Oracle database. + * + * @author Chris Cranford + */ +public class CommitScn implements Comparable { + + public static final String ROLLBACK_SEGMENT_ID_KEY = "rs_id"; + public static final String SQL_SEQUENCE_NUMBER_KEY = "ssn"; + public static final String REDO_THREAD_KEY = "redo_thread"; + + // Explicitly use TreeMap to guarantee output render order + private final Map redoThreadCommitScns = new TreeMap<>(); + + private CommitScn(Set commitScns) { + for (RedoThreadCommitScn commitScn : commitScns) { + redoThreadCommitScns.put(commitScn.getThread(), commitScn); + } + } + + /** + * Examines all redo threads and returns the maximum committed scn. + * + * @return the maximum recorded commit across all redo threads + */ + public Scn getMaxCommittedScn() { + return redoThreadCommitScns.values().stream().map(RedoThreadCommitScn::getCommitScn).max(Scn::compareTo).orElse(Scn.NULL); + } + + /** + * Get the commit scns associated with all redo threads. + * + * @return a map by redo thread with each commit system change number. + */ + public Map getCommitScnForAllRedoThreads() { + final Map result = new HashMap<>(); + for (Map.Entry entry : redoThreadCommitScns.entrySet()) { + result.put(entry.getKey(), entry.getValue().getCommitScn()); + } + return Collections.unmodifiableMap(result); + } + + /** + * Get the commit scn associated with a specific redo thread. + * + * @param thread the redo thread + * @return the commit scn associated with redo thread + */ + public Scn getCommitScnForRedoThread(int thread) { + final RedoThreadCommitScn commitScn = redoThreadCommitScns.get(thread); + return commitScn != null ? commitScn.getCommitScn() : Scn.NULL; + } + + /** + * Checks whether the transaction associated with the commit event has been handled. + * + * @param row the transaction commit event, should never be {@code null} + * @return true if the commit has been handled, false if it has not + */ + public boolean hasCommitAlreadyBeenHandled(LogMinerEventRow row) { + final RedoThreadCommitScn commitScn = redoThreadCommitScns.get(row.getThread()); + if (commitScn != null) { + return commitScn.getCommitScn().compareTo(row.getScn()) >= 0; + } + return false; + } + + /** + * Records the specified commit in the commit scn + * + * @param row the commit event, should never be {@code null} + */ + public void recordCommit(LogMinerEventRow row) { + final RedoThreadCommitScn redoCommitScn = redoThreadCommitScns.get(row.getThread()); + if (redoCommitScn != null) { + redoCommitScn.setCommitScn(row.getScn()); + redoCommitScn.setRsId(row.getRsId()); + redoCommitScn.setSsn(row.getSsn()); + } + else { + redoThreadCommitScns.put(row.getThread(), new RedoThreadCommitScn(row)); + } + } + + /** + * Set the commit scn across all redo threads. + * + * @param commitScn the commit scn to be set, should not be {@code null} + */ + public void setCommitScnOnAllThreads(Scn commitScn) { + for (RedoThreadCommitScn redoCommitScn : redoThreadCommitScns.values()) { + redoCommitScn.setCommitScn(commitScn); + } + } + + @Override + public int compareTo(Scn scn) { + if (redoThreadCommitScns.isEmpty()) { + return Scn.NULL.compareTo(scn); + } + + int result = 1; + for (RedoThreadCommitScn commitScn : redoThreadCommitScns.values()) { + int check = commitScn.getCommitScn().compareTo(scn); + if (check < result) { + result = check; + } + } + return result; + } + + /** + * Store the contents of the CommitScn in the connector offsets. + * + * @param offset the offsets, should not be {@code null} + * @return the adjusted offsets + */ + public Map store(Map offset) { + offset.put(SourceInfo.COMMIT_SCN_KEY, toCommaSeparatedValue()); + return offset; + } + + /** + * Store the contents of the CommitScn in the source info struct. + * + * @param sourceInfo the connector's source info data + * @param sourceInfoStruct the source info struct + * @return the adjusted source info struct + */ + public Struct store(SourceInfo sourceInfo, Struct sourceInfoStruct) { + if (sourceInfo.getRedoThread() != null) { + final RedoThreadCommitScn redoThreadCommitScn = redoThreadCommitScns.get(sourceInfo.getRedoThread()); + if (redoThreadCommitScn != null) { + if (redoThreadCommitScn.getCommitScn() != null && !redoThreadCommitScn.getCommitScn().isNull()) { + sourceInfoStruct.put(SourceInfo.COMMIT_SCN_KEY, redoThreadCommitScn.getCommitScn().toString()); + } + + if (redoThreadCommitScn.getRsId() != null) { + sourceInfoStruct.put(ROLLBACK_SEGMENT_ID_KEY, redoThreadCommitScn.getRsId()); + } + + sourceInfoStruct.put(SQL_SEQUENCE_NUMBER_KEY, redoThreadCommitScn.getSsn()); + sourceInfoStruct.put(REDO_THREAD_KEY, redoThreadCommitScn.getThread()); + } + } + return sourceInfoStruct; + } + + @Override + public String toString() { + return "CommitScn [redoThreadCommitScns=" + redoThreadCommitScns + "]"; + } + + /** + * Parses a string-based representation of commit scn entries as a CommitScn instance. + * + * @param value the commit scn entries, comma-separated + * @return the commit scn instance, never null + */ + public static CommitScn valueOf(String value) { + final Set scns = new HashSet<>(); + if (value != null) { + final String[] parts = value.split(","); + for (int i = 0; i < parts.length; ++i) { + final String part = parts[i]; + scns.add(RedoThreadCommitScn.valueOf(part)); + } + } + return new CommitScn(scns); + } + + /** + * Parses a long-based representation of commit scn entries as a CommitScn instance. + * + * @param value the commit scn long value, should never be {@code null} + * @return the commit scn instance, never null + */ + public static CommitScn valueOf(Long value) { + final Set scns = new HashSet<>(); + if (value != null) { + scns.add(new RedoThreadCommitScn(1, Scn.valueOf(value), null, 0)); + } + return new CommitScn(scns); + } + + /** + * Load the CommitScn values from the offsets. + * + * @param offset the connector offsets, should not be {@code null} + * @return the commit scn instance, never {@code null} + */ + public static CommitScn load(Map offset) { + Object value = offset.get(SourceInfo.COMMIT_SCN_KEY); + if (value instanceof String) { + return CommitScn.valueOf((String) value); + } + else if (value != null) { + // This might be a legacy offset being read when the values were Long data types. + // In this case, we can assume that the redo thread is 1 and explicitly create a + // redo thread entry for it. + return CommitScn.valueOf((Long) value); + } + // return a commit scn instance with no redo thread data. + return new CommitScn(Collections.emptySet()); + } + + public static SchemaBuilder schemaBuilder(SchemaBuilder schemaBuilder) { + return schemaBuilder.field(ROLLBACK_SEGMENT_ID_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SQL_SEQUENCE_NUMBER_KEY, Schema.OPTIONAL_INT32_SCHEMA) + .field(REDO_THREAD_KEY, Schema.OPTIONAL_INT32_SCHEMA); + } + + /** + * Returns the commit scn as a comma-separated list of string values. + */ + private String toCommaSeparatedValue() { + if (!redoThreadCommitScns.isEmpty()) { + return redoThreadCommitScns.values().stream() + .map(RedoThreadCommitScn::getFormattedString) + .collect(Collectors.joining(",")); + } + return null; + } + + /** + * Represents a commit {@link Scn} for a specific redo thread. + */ + public static class RedoThreadCommitScn { + + private final int thread; + private Scn commitScn; + private String rsId; + private int ssn; + + public RedoThreadCommitScn(int thread) { + this(thread, Scn.NULL, null, 0); + } + + public RedoThreadCommitScn(LogMinerEventRow row) { + this(row.getThread(), row.getScn(), row.getRsId(), row.getSsn()); + } + + public RedoThreadCommitScn(int thread, Scn commitScn, String rsId, int ssn) { + this.thread = thread; + this.commitScn = commitScn; + this.rsId = rsId; + this.ssn = ssn; + } + + public int getThread() { + return thread; + } + + public Scn getCommitScn() { + return commitScn; + } + + public void setCommitScn(Scn commitScn) { + this.commitScn = commitScn; + } + + public String getRsId() { + return rsId; + } + + public void setRsId(String rsId) { + this.rsId = rsId; + } + + public int getSsn() { + return ssn; + } + + public void setSsn(int ssn) { + this.ssn = ssn; + } + + public String getFormattedString() { + return commitScn.toString() + ":" + (rsId != null ? rsId : "") + ":" + ssn + ":" + thread; + } + + public static RedoThreadCommitScn valueOf(String value) { + final String[] parts = value.split(":"); + if (parts.length == 1) { + // Reading a legacy commit_scn entry that has only the SCN bit + // Create the redo thread entry with thread 1. + // There is only ever a single redo thread commit entry in this use case. + return new RedoThreadCommitScn(1, Scn.valueOf(parts[0]), null, 0); + } + else if (parts.length == 4) { + // The new redo-thread based commit scn entry + final Scn scn = Scn.valueOf(parts[0]); + final String rsId = parts[1]; + final int ssn = Integer.parseInt(parts[2]); + final int thread = Integer.parseInt(parts[3]); + return new RedoThreadCommitScn(thread, scn, rsId, ssn); + } + throw new DebeziumException("An unexpected redo thread commit scn entry: '" + value + "'"); + } + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 1d896949c..df804fc61 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -52,7 +52,7 @@ public class OracleOffsetContext extends CommonOffsetContext { */ private boolean snapshotCompleted; - public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Scn commitScn, String lcrPosition, + public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, CommitScn commitScn, String lcrPosition, Scn snapshotScn, Map snapshotPendingTransactions, boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext, IncrementalSnapshotContext incrementalSnapshotContext) { @@ -70,6 +70,7 @@ public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Strin // During streaming this value will be updated by the current event handler. sourceInfo.setEventScn(scn); sourceInfo.setLcrPosition(lcrPosition); + sourceInfo.setCommitScn(CommitScn.valueOf((String) null)); sourceInfoSchema = sourceInfo.schema(); // Snapshot SCN is a new field and may be null in cases where the offsets are being read from @@ -184,9 +185,8 @@ public static Builder create() { } else { final Scn scn = sourceInfo.getScn(); - final Scn commitScn = sourceInfo.getCommitScn(); offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : null); - offset.put(SourceInfo.COMMIT_SCN_KEY, commitScn != null ? commitScn.toString() : null); + sourceInfo.getCommitScn().store(offset); } if (snapshotPendingTransactions != null && !snapshotPendingTransactions.isEmpty()) { String encoded = snapshotPendingTransactions.entrySet().stream() @@ -209,10 +209,6 @@ public void setScn(Scn scn) { sourceInfo.setScn(scn); } - public void setCommitScn(Scn commitScn) { - sourceInfo.setCommitScn(commitScn); - } - public void setEventScn(Scn eventScn) { sourceInfo.setEventScn(eventScn); } @@ -221,7 +217,7 @@ public Scn getScn() { return sourceInfo.getScn(); } - public Scn getCommitScn() { + public CommitScn getCommitScn() { return sourceInfo.getCommitScn(); } @@ -261,6 +257,14 @@ public void setTableId(TableId tableId) { sourceInfo.tableEvent(tableId); } + public Integer getRedoThread() { + return sourceInfo.getRedoThread(); + } + + public void setRedoThread(Integer redoThread) { + sourceInfo.setRedoThread(redoThread); + } + @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && !snapshotCompleted; @@ -286,7 +290,7 @@ public String toString() { sb.append(", snapshot_completed=").append(snapshotCompleted); } - sb.append(", commit_scn=").append(sourceInfo.getCommitScn()); + sb.append(", commit_scn=").append(sourceInfo.getCommitScn().toString()); sb.append("]"); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java index cccacd690..ec6960fa9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java @@ -6,6 +6,7 @@ package io.debezium.connector.oracle; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import io.debezium.config.CommonConnectorConfig; @@ -17,15 +18,16 @@ public class OracleSourceInfoStructMaker extends AbstractSourceInfoStructMaker tableIds; + private Integer redoThread; protected SourceInfo(OracleConnectorConfig connectorConfig) { super(connectorConfig); @@ -41,7 +42,7 @@ public Scn getScn() { return scn; } - public Scn getCommitScn() { + public CommitScn getCommitScn() { return commitScn; } @@ -53,7 +54,7 @@ public void setScn(Scn scn) { this.scn = scn; } - public void setCommitScn(Scn commitScn) { + public void setCommitScn(CommitScn commitScn) { this.commitScn = commitScn; } @@ -110,6 +111,14 @@ public void tableEvent(TableId tableId) { this.tableIds = Collections.singleton(tableId); } + public Integer getRedoThread() { + return redoThread; + } + + public void setRedoThread(Integer redoThread) { + this.redoThread = redoThread; + } + @Override protected Instant timestamp() { return sourceTime; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java index cb2a9d148..aa70536da 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java @@ -7,6 +7,7 @@ import java.util.Map; +import io.debezium.connector.oracle.CommitScn; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.Scn; @@ -32,7 +33,7 @@ public OracleOffsetContext load(Map offset) { boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY)); Scn scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY); - Scn commitScn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.COMMIT_SCN_KEY); + CommitScn commitScn = CommitScn.load(offset); Map snapshotPendingTransactions = OracleOffsetContext.loadSnapshotPendingTransactions(offset); Scn snapshotScn = OracleOffsetContext.loadSnapshotScn(offset); return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshotScn, snapshotPendingTransactions, snapshot, snapshotCompleted, diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java index bf3471134..88a84f2a2 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java @@ -54,7 +54,7 @@ public class LogMinerQueryBuilder { public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) { final StringBuilder query = new StringBuilder(1024); query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, "); - query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO "); + query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# "); query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" "); // These bind parameters will be bound when the query is executed by the caller. diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index f275efa96..2dc598956 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -269,11 +269,10 @@ else if (scn.compareTo(minScn) < 0) { // Make sure the commit SCN is at least the snapshot SCN - 1. // This ensures we'll never emit events for transactions that were complete before the snapshot was // taken. - Scn originalCommitScn = offsetContext.getCommitScn(); - if (originalCommitScn == null || originalCommitScn.compareTo(snapshotScn) < 0) { + if (offsetContext.getCommitScn().compareTo(snapshotScn) < 0) { LOGGER.info("Setting commit SCN to {} (snapshot SCN - 1) to ensure we don't double-emit events from pre-snapshot transactions.", snapshotScn.subtract(Scn.ONE)); - offsetContext.setCommitScn(snapshotScn.subtract(Scn.ONE)); + offsetContext.getCommitScn().setCommitScnOnAllThreads(snapshotScn.subtract(Scn.ONE)); } // set start SCN to minScn diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/LogMinerEventRow.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/LogMinerEventRow.java index 077cd3ade..67fac7043 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/LogMinerEventRow.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/LogMinerEventRow.java @@ -47,6 +47,8 @@ public class LogMinerEventRow { private static final int RS_ID = 13; private static final int STATUS = 14; private static final int INFO = 15; + private static final int SSN = 16; + private static final int THREAD = 17; private Scn scn; private TableId tableId; @@ -63,6 +65,8 @@ public class LogMinerEventRow { private String redoSql; private int status; private String info; + private int ssn; + private int thread; public Scn getScn() { return scn; @@ -124,6 +128,14 @@ public String getInfo() { return info; } + public int getSsn() { + return ssn; + } + + public int getThread() { + return thread; + } + /** * Returns a {@link LogMinerEventRow} instance based on the current row of the JDBC {@link ResultSet}. * @@ -167,6 +179,8 @@ private void initializeFromResultSet(ResultSet resultSet, String catalogName, bo this.redoSql = getSqlRedo(resultSet); this.status = resultSet.getInt(STATUS); this.info = resultSet.getString(INFO); + this.ssn = resultSet.getInt(SSN); + this.thread = resultSet.getInt(THREAD); if (this.tableName != null) { this.tableId = new TableId(catalogName, tablespaceName, tableName); } @@ -234,6 +248,7 @@ public String toString() { ", rowId='" + rowId + '\'' + ", rollbackFlag=" + rollbackFlag + ", rsId=" + rsId + + ", ssn=" + ssn + ", redoSql='" + redoSql + '\'' + '}'; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 9b1478905..ea15c3734 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -11,6 +11,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneOffset; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.function.Supplier; @@ -74,9 +75,7 @@ public abstract class AbstractLogMinerEventProcessor currentOffsetCommitScns = new HashMap<>(); private Scn lastProcessedScn = Scn.NULL; private boolean sequenceUnavailable = false; @@ -192,11 +191,11 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc metrics.setLastCapturedDmlCount(counters.dmlCount); if (counters.dmlCount > 0 || counters.commitCount > 0 || counters.rollbackCount > 0) { - warnPotentiallyStuckScn(currentOffsetScn, currentOffsetCommitScn); + warnPotentiallyStuckScn(currentOffsetScn, currentOffsetCommitScns); currentOffsetScn = offsetContext.getScn(); if (offsetContext.getCommitScn() != null) { - currentOffsetCommitScn = offsetContext.getCommitScn(); + currentOffsetCommitScns = offsetContext.getCommitScn().getCommitScnForAllRedoThreads(); } } @@ -207,7 +206,7 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc metrics.getMillisecondToSleepBetweenMiningQuery()); metrics.addProcessedRows(counters.rows); - return calculateNewStartScn(endScn, maxCommittedScn); + return calculateNewStartScn(endScn, offsetContext.getCommitScn().getMaxCommittedScn()); } } } @@ -346,7 +345,8 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn); final Scn commitScn = row.getScn(); - if (isTransactionAlreadyProcessed(commitScn, offsetContext.getCommitScn())) { + if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) { + final Scn lastCommittedScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread()); LOGGER.debug("Transaction {} has already been processed. " + "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.", transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn); @@ -379,9 +379,10 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted offsetContext.setTransactionId(transactionId); offsetContext.setSourceTime(event.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds())); offsetContext.setTableId(event.getTableId()); + offsetContext.setRedoThread(row.getThread()); if (eventsProcessed == numEvents) { // reached the last event update the commit scn in the offsets - offsetContext.setCommitScn(commitScn); + offsetContext.getCommitScn().recordCommit(row); } final DmlEvent dmlEvent = (DmlEvent) event; @@ -432,7 +433,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted } } - lastCommittedScn = Scn.valueOf(commitScn.longValue()); offsetContext.setEventScn(commitScn); if (getTransactionEventCount(transaction) > 0 && !skipExcludedUserName) { dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, transaction.getChangeTime()); @@ -442,9 +442,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted } metrics.calculateLagMetrics(row.getChangeTime()); - if (lastCommittedScn.compareTo(maxCommittedScn) > 0) { - maxCommittedScn = lastCommittedScn; - } finalizeTransactionCommit(transactionId, commitScn); removeTransactionAndEventsFromCache(transaction); @@ -457,18 +454,6 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted metrics.setLastCommitDuration(Duration.between(start, Instant.now())); } - /** - * Checks whether the transaction commit system change number has already been processed. - * - * @param commitScn the transaction's commit system change number, should not be {@code null} - * @param offsetCommitScn the current offsets commit system change number, should not be {@code null} - * @return true if the transaction has been seen based on the offsets, false otherwise - */ - protected boolean isTransactionAlreadyProcessed(Scn commitScn, Scn offsetCommitScn) { - return (offsetCommitScn != null && offsetCommitScn.compareTo(commitScn) >= 0) - || lastCommittedScn.compareTo(commitScn) > 0; - } - /** * Gets a transaction instance from the transaction cache while also removing its cache entry. * @@ -559,9 +544,10 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept return; } - final Scn commitScn = offsetContext.getCommitScn(); - if (commitScn != null && commitScn.compareTo(row.getScn()) >= 0) { - LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN)", row.getRedoSql(), row.getScn(), commitScn); + if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) { + final Scn commitScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread()); + LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN for redo thread {})", + row.getRedoSql(), row.getScn(), commitScn, row.getThread()); return; } @@ -590,10 +576,11 @@ else if (activeTransactions == 1) { } // Should always advance the commit SCN point with schema changes - LOGGER.debug("Schema change advanced offset commit SCN to {}", row.getScn()); - offsetContext.setCommitScn(row.getScn()); + LOGGER.debug("Schema change advanced offset commit SCN to {} for thread {}", row.getScn(), row.getThread()); + offsetContext.getCommitScn().recordCommit(row); offsetContext.setEventScn(row.getScn()); + offsetContext.setRedoThread(row.getThread()); dispatcher.dispatchSchemaChangeEvent(partition, tableId, new OracleSchemaChangeEventEmitter( @@ -785,19 +772,19 @@ protected void handleDataEvent(LogMinerEventRow row) throws SQLException, Interr * while the offset's {@code commit_scn} is changing between sessions. * * @param previousOffsetScn the previous offset system change number - * @param previousOffsetCommitScn the previous offset commit system change number + * @param previousOffsetCommitScns the previous offset commit system change number */ - protected void warnPotentiallyStuckScn(Scn previousOffsetScn, Scn previousOffsetCommitScn) { + protected void warnPotentiallyStuckScn(Scn previousOffsetScn, Map previousOffsetCommitScns) { if (offsetContext != null && offsetContext.getCommitScn() != null) { final Scn scn = offsetContext.getScn(); - final Scn commitScn = offsetContext.getCommitScn(); - if (previousOffsetScn.equals(scn) && !previousOffsetCommitScn.equals(commitScn)) { + final Map commitScns = offsetContext.getCommitScn().getCommitScnForAllRedoThreads(); + if (previousOffsetScn.equals(scn) && !previousOffsetCommitScns.equals(commitScns)) { counters.stuckCount++; if (counters.stuckCount == 25) { LOGGER.warn("Offset SCN {} has not changed in 25 mining session iterations. " + - "This indicates long running transaction(s) are active. Commit SCN {}.", + "This indicates long running transaction(s) are active. Commit SCNs {}.", previousOffsetScn, - previousOffsetCommitScn); + previousOffsetCommitScns); metrics.incrementScnFreezeCount(); } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/CommitScnTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/CommitScnTest.java new file mode 100644 index 000000000..1de7aa4de --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/CommitScnTest.java @@ -0,0 +1,162 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import static org.fest.assertions.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import io.debezium.doc.FixFor; + +/** + * Test the basic functionality of the {@link CommitScn} model. + * + * @author Chris Cranford + */ +public class CommitScnTest { + + @Test + @FixFor("DBZ-5245") + public void shouldParseCommitScnThatIsNull() throws Exception { + // Test null with String-based valueOf + CommitScn commitScn = CommitScn.valueOf((String) null); + assertThat(commitScn).isNotNull(); + assertThat(commitScn.getCommitScnForAllRedoThreads()).isEmpty(); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.NULL); + assertThat(encodedCommitScn(commitScn)).isNull(); + + // Test null with Long-based valueOf + commitScn = CommitScn.valueOf((Long) null); + assertThat(commitScn).isNotNull(); + assertThat(commitScn.getCommitScnForAllRedoThreads()).isEmpty(); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.NULL); + assertThat(encodedCommitScn(commitScn)).isNull(); + } + + @Test + @FixFor("DBZ-5245") + public void shouldParseCommitScnThatIsNumeric() throws Exception { + CommitScn commitScn = CommitScn.valueOf(12345L); + assertThat(commitScn).isNotNull(); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1); + assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1); + assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L)); + assertThat(encodedCommitScn(commitScn)).isEqualTo("12345::0:1"); + } + + @Test + @FixFor("DBZ-5245") + public void shouldParseCommitScnThatIsString() throws Exception { + // Test parsing with only SCN value in the string + CommitScn commitScn = CommitScn.valueOf("12345"); + assertThat(commitScn).isNotNull(); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1); + assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1); + assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L)); + assertThat(encodedCommitScn(commitScn)).isEqualTo("12345::0:1"); + + // Test parsing with new multi-part SCN, single value + commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1"); + assertThat(commitScn).isNotNull(); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1); + assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1); + assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(12345L)); + assertThat(encodedCommitScn(commitScn)).isEqualTo("12345:00241f.00093ff0.0010:0:1"); + + // Test parsing with new multi-part SCN, multi value + commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1,678901:1253ef.123457ee0.abcd:0:2"); + assertThat(commitScn).isNotNull(); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(678901L)); + assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(2); + assertThat(commitScn.getCommitScnForAllRedoThreads().keySet()).containsOnly(1, 2); + assertThat(commitScn.getCommitScnForAllRedoThreads().get(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForAllRedoThreads().get(2)).isEqualTo(Scn.valueOf(678901L)); + assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(12345L)); + assertThat(commitScn.getCommitScnForRedoThread(2)).isEqualTo(Scn.valueOf(678901L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(678901L)); + assertThat(encodedCommitScn(commitScn)).isEqualTo("12345:00241f.00093ff0.0010:0:1,678901:1253ef.123457ee0.abcd:0:2"); + } + + @Test + @FixFor("DBZ-5245") + public void shouldSetCommitScnAcrossAllRedoThreads() throws Exception { + // Test no redo thread data + CommitScn commitScn = CommitScn.valueOf((String) null); + commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.NULL); + assertThat(commitScn.getCommitScnForAllRedoThreads()).isEmpty(); + + // Test with a single commit scn + commitScn = CommitScn.valueOf("12345"); + commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(23456L)); + assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1); + assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(23456L)); + + // Test with a single redo record + commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1"); + commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(23456L)); + assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(1); + assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(23456L)); + + // Test with a multi redo record + commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1,678901:1253ef.123457ee0.abcd:0:2"); + commitScn.setCommitScnOnAllThreads(Scn.valueOf(23456L)); + assertThat(commitScn.getMaxCommittedScn()).isEqualTo(Scn.valueOf(23456L)); + assertThat(commitScn.getCommitScnForAllRedoThreads()).hasSize(2); + assertThat(commitScn.getCommitScnForRedoThread(1)).isEqualTo(Scn.valueOf(23456L)); + assertThat(commitScn.getCommitScnForRedoThread(2)).isEqualTo(Scn.valueOf(23456L)); + } + + @Test + @FixFor("DBZ-5245") + public void shouldBeComparableWithScn() throws Exception { + // Test no redo thread data + CommitScn commitScn = CommitScn.valueOf((String) null); + assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(0); // equal + assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(-1); // less-than + + // Test with a single commit scn + commitScn = CommitScn.valueOf("12345"); + assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(1); // greater than null + assertThat(commitScn.compareTo(Scn.valueOf(123L))).isEqualTo(1); // greater than 123 + assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(0); // equal + assertThat(commitScn.compareTo(Scn.valueOf(23456L))).isEqualTo(-1); // less-than + + // Test with a single redo record + commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1"); + assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(1); // greater than null + assertThat(commitScn.compareTo(Scn.valueOf(123L))).isEqualTo(1); // greater than 123 + assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(0); // equal + assertThat(commitScn.compareTo(Scn.valueOf(23456L))).isEqualTo(-1); // less-than + + // Test with a multi redo record + commitScn = CommitScn.valueOf("12345:00241f.00093ff0.0010:0:1,345678:1253ef.123457ee0.abcd:0:2"); + assertThat(commitScn.compareTo(Scn.NULL)).isEqualTo(1); // both greater than null + assertThat(commitScn.compareTo(Scn.valueOf(123L))).isEqualTo(1); // both greater than 123 + assertThat(commitScn.compareTo(Scn.valueOf(12345L))).isEqualTo(0); // thread 1 is equal to value + assertThat(commitScn.compareTo(Scn.valueOf(23456L))).isEqualTo(-1); // thread 1 is less than value + assertThat(commitScn.compareTo(Scn.valueOf(456789L))).isEqualTo(-1); // both less than 456789 + } + + private static String encodedCommitScn(CommitScn value) { + final Map offsets = value.store(new HashMap<>()); + return (String) offsets.get(SourceInfo.COMMIT_SCN_KEY); + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java index d42d0b18f..0035366bb 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleClobDataTypeIT.java @@ -2012,12 +2012,12 @@ public void shouldUpdateCommitScnOnLobTransaction() throws Exception { assertThat(after.get("DATA")).isNotNull(); Struct source = ((Struct) tableRecords.get(0).value()).getStruct("source"); - assertThat(source.get("commit_scn")).isNotNull(); - assertThat(source.get("scn")).isNotNull(); + assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull(); + assertThat(source.get(SourceInfo.COMMIT_SCN_KEY)).isNotNull(); - final long commitScn = Scn.valueOf(source.getString("commit_scn")).longValue(); - final long scn = Scn.valueOf(source.getString("scn")).longValue(); - assertThat(commitScn).isGreaterThanOrEqualTo(scn); + final String commitScn = source.getString(SourceInfo.COMMIT_SCN_KEY); + final String scn = source.getString(SourceInfo.SCN_KEY); + assertThat(Scn.valueOf(commitScn).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(scn).longValue()); } finally { TestHelper.dropTable(connection, "dbz5266"); @@ -2054,12 +2054,12 @@ public void shouldUpdateCommitScnOnNonLobTransactionWithLobEnabled() throws Exce assertThat(after.get("DATA")).isNotNull(); Struct source = ((Struct) tableRecords.get(0).value()).getStruct("source"); - assertThat(source.get("commit_scn")).isNotNull(); - assertThat(source.get("scn")).isNotNull(); + assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull(); + assertThat(source.get(SourceInfo.COMMIT_SCN_KEY)).isNotNull(); - final long commitScn = Scn.valueOf(source.getString("commit_scn")).longValue(); - final long scn = Scn.valueOf(source.getString("scn")).longValue(); - assertThat(commitScn).isGreaterThanOrEqualTo(scn); + final String commitScn = source.getString(SourceInfo.COMMIT_SCN_KEY); + final String scn = source.getString(SourceInfo.SCN_KEY); + assertThat(Scn.valueOf(commitScn).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(scn).longValue()); } finally { TestHelper.dropTable(connection, "dbz5266"); @@ -2096,12 +2096,12 @@ public void shouldUpdateCommitScnOnNonLobTransactionWithLobDisabled() throws Exc assertThat(after.get("DATA")).isNotNull(); Struct source = ((Struct) tableRecords.get(0).value()).getStruct("source"); - assertThat(source.get("commit_scn")).isNotNull(); - assertThat(source.get("scn")).isNotNull(); + assertThat(source.get(SourceInfo.SCN_KEY)).isNotNull(); + assertThat(source.get(SourceInfo.COMMIT_SCN_KEY)).isNotNull(); - final long commitScn = Scn.valueOf(source.getString("commit_scn")).longValue(); - final long scn = Scn.valueOf(source.getString("scn")).longValue(); - assertThat(commitScn).isGreaterThanOrEqualTo(scn); + final String commitScn = source.getString(SourceInfo.COMMIT_SCN_KEY); + final String scn = source.getString(SourceInfo.SCN_KEY); + assertThat(Scn.valueOf(commitScn).longValue()).isGreaterThanOrEqualTo(Scn.valueOf(scn).longValue()); } finally { TestHelper.dropTable(connection, "dbz5266"); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java index 81cd13ad4..8262e5080 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDatabaseHistoryTest.java @@ -76,7 +76,7 @@ protected Offsets getOffsets() { .with(RelationalDatabaseConnectorConfig.SERVER_NAME, TestHelper.SERVER_NAME) .build(); final OracleOffsetContext position = new OracleOffsetContext(new OracleConnectorConfig(config), Scn.valueOf(999), - Scn.valueOf(999), null, Scn.valueOf(999), Collections.emptyMap(), false, true, new TransactionContext(), + CommitScn.valueOf(999L), null, Scn.valueOf(999), Collections.emptyMap(), false, true, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>()); return Offsets.of(source, position); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java index 46440d160..35596c564 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java @@ -42,8 +42,8 @@ public void beforeEach() throws Exception { } @Test - @FixFor("DBZ-2994") - public void shouldreadScnAndCommitScnAsLongValues() throws Exception { + @FixFor({ "DBZ-2994", "DBZ-5245" }) + public void shouldReadScnAndCommitScnAsLongValues() throws Exception { final Map offsetValues = new HashMap<>(); offsetValues.put(SourceInfo.SCN_KEY, 12345L); offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L); @@ -51,12 +51,12 @@ public void shouldreadScnAndCommitScnAsLongValues() throws Exception { final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345")); if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { - assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456")); + assertThat(offsetContext.getCommitScn().getMaxCommittedScn()).isEqualTo(Scn.valueOf("23456")); } } @Test - @FixFor("DBZ-2994") + @FixFor({ "DBZ-2994", "DBZ-5245" }) public void shouldReadScnAndCommitScnAsStringValues() throws Exception { final Map offsetValues = new HashMap<>(); offsetValues.put(SourceInfo.SCN_KEY, "12345"); @@ -65,12 +65,12 @@ public void shouldReadScnAndCommitScnAsStringValues() throws Exception { final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345")); if (TestHelper.adapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { - assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456")); + assertThat(offsetContext.getCommitScn().getMaxCommittedScn()).isEqualTo(Scn.valueOf("23456")); } } @Test - @FixFor("DBZ-2994") + @FixFor({ "DBZ-2994", "DBZ-5245" }) public void shouldHandleNullScnAndCommitScnValues() throws Exception { final Map offsetValues = new HashMap<>(); offsetValues.put(SourceInfo.SCN_KEY, null); @@ -78,11 +78,11 @@ public void shouldHandleNullScnAndCommitScnValues() throws Exception { final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); assertThat(offsetContext.getScn()).isNull(); - assertThat(offsetContext.getCommitScn()).isNull(); + assertThat(offsetContext.getCommitScn().getMaxCommittedScn()).isEqualTo(Scn.NULL); } @Test - @FixFor("DBZ-4937") + @FixFor({ "DBZ-4937", "DBZ-5245" }) public void shouldCorrectlySerializeOffsetsWithSnapshotBasedKeysFromOlderOffsets() throws Exception { // Offsets from Debezium 1.8 final Map offsetValues = new HashMap<>(); @@ -95,7 +95,7 @@ public void shouldCorrectlySerializeOffsetsWithSnapshotBasedKeysFromOlderOffsets // Write values out as Debezium 1.9 Map writeValues = offsetContext.getOffset(); assertThat(writeValues.get(SourceInfo.SCN_KEY)).isEqualTo("745688898023"); - assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024"); + assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024::0:1"); assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_PENDING_TRANSACTIONS_KEY)).isNull(); assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_SCN_KEY)).isNull(); @@ -105,7 +105,7 @@ public void shouldCorrectlySerializeOffsetsWithSnapshotBasedKeysFromOlderOffsets // Write values out as Debezium 1.9 writeValues = offsetContext.getOffset(); assertThat(writeValues.get(SourceInfo.SCN_KEY)).isEqualTo("745688898023"); - assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024"); + assertThat(writeValues.get(SourceInfo.COMMIT_SCN_KEY)).isEqualTo("745688898024::0:1"); assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_PENDING_TRANSACTIONS_KEY)).isNull(); assertThat(writeValues.get(OracleOffsetContext.SNAPSHOT_SCN_KEY)).isNull(); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java index 176d1121e..0c599b56e 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java @@ -62,6 +62,9 @@ public void schemaIsCorrect() { .field("scn", Schema.OPTIONAL_STRING_SCHEMA) .field("commit_scn", Schema.OPTIONAL_STRING_SCHEMA) .field("lcr_position", Schema.OPTIONAL_STRING_SCHEMA) + .field("rs_id", Schema.OPTIONAL_STRING_SCHEMA) + .field("ssn", Schema.OPTIONAL_INT32_SCHEMA) + .field("redo_thread", Schema.OPTIONAL_INT32_SCHEMA) .build(); VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java index 4eacfcc76..7324bce85 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java @@ -61,7 +61,7 @@ public class LogMinerQueryBuilderTest { * {@code database.history.store.only.captured.tables.ddl} is {@code false}. */ private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE1 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + - "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO " + + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " + "FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " + "${systemTablePredicate}" + "AND ((" + @@ -82,7 +82,7 @@ public class LogMinerQueryBuilderTest { * {@code database.history.store.only.captured.tables.ddl} is {@code true}. */ private static final String LOG_MINER_CONTENT_QUERY_TEMPLATE2 = "SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, " + - "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO " + + "XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID, ROLLBACK, RS_ID, STATUS, INFO, SSN, THREAD# " + "FROM V$LOGMNR_CONTENTS WHERE SCN > ? AND SCN <= ? " + "${systemTablePredicate}" + "AND ((" + diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java index 9131613d1..c90121c1e 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java @@ -27,6 +27,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.oracle.CommitScn; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; @@ -83,6 +84,8 @@ public void before() throws Exception { this.dispatcher = (EventDispatcher) Mockito.mock(EventDispatcher.class); this.partition = Mockito.mock(OraclePartition.class); this.offsetContext = Mockito.mock(OracleOffsetContext.class); + final CommitScn commitScn = CommitScn.valueOf((String) null); + Mockito.when(this.offsetContext.getCommitScn()).thenReturn(commitScn); this.connection = createOracleConnection(); this.schema = createOracleDatabaseSchema(); this.metrics = createMetrics(schema); diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index 34c421450..01ab87d19 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -252,8 +252,11 @@ The message contains a logical representation of the table schema. "table": "CUSTOMERS", "txId" : null, "scn" : "1513734", - "commit_scn": "1513734", - "lcr_position" : null + "commit_scn": "1513754", + "lcr_position" : null, + "rs_id": "001234.00012345.0124", + "ssn": 1, + "redo_thread": 1 }, "ts_ms": 1588252618953, // <1> "databaseName": "ORCLPDB1", // <2> @@ -901,6 +904,21 @@ The following example shows the value of a _create_ event value from the `custom "optional": true, "field": "commit_scn" }, + { + "type": "string", + "optional": true, + "field": "rs_id" + }, + { + "type": "int32", + "optional": true, + "field": "ssn" + }, + { + "type": "int32", + "optional": true, + "field": "redo_thread" + }, { "type": "boolean", "optional": true, @@ -940,6 +958,9 @@ The following example shows the value of a _create_ event value from the `custom "txId": "6.28.807", "scn": "2122185", "commit_scn": "2122185", + "rs_id": "001234.00012345.0124", + "ssn": 1, + "redo_thread": 1, "snapshot": false }, "op": "c", @@ -1000,6 +1021,9 @@ The following example shows an _update_ change event that the connector captures "txId": "6.9.809", "scn": "2125544", "commit_scn": "2125544", + "rs_id": "001234.00012345.0124", + "ssn": 1, + "redo_thread": 1, "snapshot": false }, "op": "u", @@ -1058,6 +1082,9 @@ The `schema` portion of the _delete_ event is identical to the `schema` portion "txId": "6.28.807", "scn": "2122184", "commit_scn": "2122184", + "rs_id": "001234.00012345.0124", + "ssn": 1, + "redo_thread": 1, "snapshot": false }, "op": "d", @@ -1113,7 +1140,10 @@ The message key is `null` in this case, the message value looks like this: "txId": "02000a0037030000", "scn": "13234397", "commit_scn": "13271102", - "lcr_position": null + "lcr_position": null, + "rs_id": "001234.00012345.0124", + "ssn": 1, + "redo_thread": 1 }, "op": "t", // <2> "ts_ms": 1638974558961, // <3>