From a9d59710a735a89448e0af79369aea991d408d5f Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Mon, 15 Mar 2021 18:49:16 -0400 Subject: [PATCH] DBZ-2994 Use string-representation of SCN in offsets & source info --- .../oracle/OracleConnectorConfig.java | 17 ++-- .../connector/oracle/OracleOffsetContext.java | 65 ++++++++++---- .../OracleSnapshotChangeEventSource.java | 6 +- .../oracle/OracleSourceInfoStructMaker.java | 8 +- .../debezium/connector/oracle/SourceInfo.java | 14 +-- .../oracle/logminer/LogMinerHelper.java | 2 +- .../LogMinerQueryResultProcessor.java | 16 ++-- .../LogMinerStreamingChangeEventSource.java | 5 +- .../connector/oracle/logminer/SqlUtils.java | 4 +- .../oracle/logminer/TransactionalBuffer.java | 6 +- .../connector/oracle/xstream/LcrPosition.java | 7 +- .../XstreamStreamingChangeEventSource.java | 11 +-- .../oracle/OracleOffsetContextTest.java | 86 +++++++++++++++++++ .../connector/oracle/SourceInfoTest.java | 2 + .../oracle/logminer/SqlUtilsTest.java | 2 +- .../logminer/TransactionalBufferTest.java | 16 ++-- 16 files changed, 202 insertions(+), 65 deletions(-) create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 248a5af28..dd7eb029f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -29,6 +29,7 @@ import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.oracle.logminer.HistoryRecorder; import io.debezium.connector.oracle.logminer.NeverHistoryRecorder; +import io.debezium.connector.oracle.logminer.Scn; import io.debezium.connector.oracle.logminer.SqlUtils; import io.debezium.connector.oracle.xstream.LcrPosition; import io.debezium.connector.oracle.xstream.OracleVersion; @@ -451,24 +452,28 @@ protected HistoryRecordComparator getHistoryRecordComparator() { return new HistoryRecordComparator() { @Override protected boolean isPositionAtOrBefore(Document recorded, Document desired) { - Long recordedScn; - Long desiredScn; + Scn recordedScn; + Scn desiredScn; if (getAdapter() == OracleConnectorConfig.ConnectorAdapter.XSTREAM) { final LcrPosition recordedPosition = LcrPosition.valueOf(recorded.getString(SourceInfo.LCR_POSITION_KEY)); final LcrPosition desiredPosition = LcrPosition.valueOf(desired.getString(SourceInfo.LCR_POSITION_KEY)); - recordedScn = recordedPosition != null ? recordedPosition.getScn() : recorded.getLong(SourceInfo.SCN_KEY); - desiredScn = desiredPosition != null ? desiredPosition.getScn() : desired.getLong(SourceInfo.SCN_KEY); + recordedScn = recordedPosition != null ? recordedPosition.getScn() : getScnFromString(recorded.getString(SourceInfo.SCN2_KEY)); + desiredScn = desiredPosition != null ? desiredPosition.getScn() : getScnFromString(desired.getString(SourceInfo.SCN2_KEY)); return (recordedPosition != null && desiredPosition != null) ? recordedPosition.compareTo(desiredPosition) < 1 : recordedScn.compareTo(desiredScn) < 1; } else { - recordedScn = recorded.getLong(SourceInfo.SCN_KEY); - desiredScn = desired.getLong(SourceInfo.SCN_KEY); + recordedScn = getScnFromString(recorded.getString(SourceInfo.SCN_KEY)); + desiredScn = getScnFromString(desired.getString(SourceInfo.SCN_KEY)); return recordedScn.compareTo(desiredScn) < 1; } } + + private Scn getScnFromString(String value) { + return value == null ? Scn.ZERO : Scn.valueOf(value); + } }; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 64fcc53b8..3f4e4de73 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -14,6 +14,7 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.oracle.logminer.Scn; import io.debezium.connector.oracle.xstream.LcrPosition; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -36,13 +37,13 @@ public class OracleOffsetContext implements OffsetContext { */ private boolean snapshotCompleted; - public OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, Long commitScn, LcrPosition lcrPosition, + public OracleOffsetContext(OracleConnectorConfig connectorConfig, String scn, String commitScn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) { this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext); sourceInfo.setCommitScn(commitScn); } - private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, LcrPosition lcrPosition, + private OracleOffsetContext(OracleConnectorConfig connectorConfig, String scn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); @@ -65,7 +66,7 @@ private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, Lcr public static class Builder { private OracleConnectorConfig connectorConfig; - private long scn; + private String scn; private LcrPosition lcrPosition; private boolean snapshot; private boolean snapshotCompleted; @@ -76,11 +77,16 @@ public Builder logicalName(OracleConnectorConfig connectorConfig) { return this; } - public Builder scn(long scn) { + public Builder scn(String scn) { this.scn = scn; return this; } + public Builder scn(Scn scn) { + scn(scn != null ? scn.toString() : null); + return this; + } + public Builder lcrPosition(LcrPosition lcrPosition) { this.lcrPosition = lcrPosition; return this; @@ -120,7 +126,7 @@ public static Builder create() { if (sourceInfo.isSnapshot()) { Map offset = new HashMap<>(); - offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); + offset.put(SourceInfo.SCN2_KEY, sourceInfo.getScn()); offset.put(SourceInfo.SNAPSHOT_KEY, true); offset.put(SNAPSHOT_COMPLETED_KEY, snapshotCompleted); @@ -132,8 +138,8 @@ public static Builder create() { offset.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); } else { - offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); - offset.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); + offset.put(SourceInfo.SCN2_KEY, sourceInfo.getScn()); + offset.put(SourceInfo.COMMIT2_SCN_KEY, sourceInfo.getCommitScn()); } return transactionContext.store(offset); } @@ -149,19 +155,19 @@ public Struct getSourceInfo() { return sourceInfo.struct(); } - public void setScn(long scn) { - sourceInfo.setScn(scn); + public void setScn(Scn scn) { + sourceInfo.setScn(scn.toString()); } - public void setCommitScn(Long commitScn) { - sourceInfo.setCommitScn(commitScn); + public void setCommitScn(Scn commitScn) { + sourceInfo.setCommitScn(commitScn.toString()); } - public long getScn() { + public String getScn() { return sourceInfo.getScn(); } - public Long getCommitScn() { + public String getCommitScn() { return sourceInfo.getCommitScn(); } @@ -256,18 +262,43 @@ public Loader(OracleConnectorConfig connectorConfig, OracleConnectorConfig.Conne public OffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); - Long scn; + String scn; if (adapter == OracleConnectorConfig.ConnectorAdapter.LOG_MINER) { - scn = (Long) offset.get(SourceInfo.SCN_KEY); - Long commitScn = (Long) offset.get(SourceInfo.COMMIT_SCN_KEY); + scn = getScnFromOffset(offset, null); + String commitScn = getCommitScnFromOffset(offset); return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset)); } else { LcrPosition lcrPosition = LcrPosition.valueOf((String) offset.get(SourceInfo.LCR_POSITION_KEY)); - scn = lcrPosition != null ? lcrPosition.getScn() : (Long) offset.get(SourceInfo.SCN_KEY); + scn = getScnFromOffset(offset, lcrPosition); return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset)); } } + + private String getScnFromOffset(Map offset, LcrPosition lcrPosition) { + if (lcrPosition != null) { + return lcrPosition.getScn().toString(); + } + // Prioritize string-based SCN key over the numeric-based SCN key + if (offset.containsKey(SourceInfo.SCN2_KEY)) { + return (String) offset.get(SourceInfo.SCN2_KEY); + } + else { + Long scnAsLong = (Long) offset.get(SourceInfo.SCN_KEY); + return scnAsLong != null ? scnAsLong.toString() : null; + } + } + + private String getCommitScnFromOffset(Map offset) { + // Prioritize string-based commit SCN key over the numeric-based commit SCN key + if (offset.containsKey(SourceInfo.COMMIT2_SCN_KEY)) { + return (String) offset.get(SourceInfo.COMMIT2_SCN_KEY); + } + else { + Long commitScnAsLong = (Long) offset.get(SourceInfo.COMMIT_SCN_KEY); + return commitScnAsLong != null ? commitScnAsLong.toString() : null; + } + } } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index f93afafea..b92cdcdcb 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -123,7 +123,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext ctx) throws Exc ctx.offset = OracleOffsetContext.create() .logicalName(connectorConfig) - .scn(currentScn.longValue()) + .scn(currentScn) .transactionContext(new TransactionContext()) .build(); } @@ -280,7 +280,9 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapsh @Override protected Optional getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) { - long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn"); + final OracleOffsetContext offset = (OracleOffsetContext) snapshotContext.offset; + final String snapshotOffset = offset.getScn(); + assert snapshotOffset != null; return Optional.of("SELECT * FROM " + quote(tableId) + " AS OF SCN " + snapshotOffset); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java index 57aaff66a..742f518f8 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java @@ -25,6 +25,8 @@ public OracleSourceInfoStructMaker(String connector, String version, CommonConne .field(SourceInfo.SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA) .field(SourceInfo.COMMIT_SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA) .field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.SCN2_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.COMMIT2_SCN_KEY, Schema.OPTIONAL_STRING_SCHEMA) .build(); } @@ -39,13 +41,15 @@ public Struct struct(SourceInfo sourceInfo) { .put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema()) .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()) .put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId()) - .put(SourceInfo.SCN_KEY, sourceInfo.getScn()); + .put(SourceInfo.SCN_KEY, sourceInfo.getScn() != null ? Long.parseLong(sourceInfo.getScn()) : 0L) + .put(SourceInfo.SCN2_KEY, sourceInfo.getScn()); if (sourceInfo.getLcrPosition() != null) { ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); } if (sourceInfo.getCommitScn() != null) { - ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); + ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn() != null ? Long.parseLong(sourceInfo.getCommitScn()) : 0L); + ret.put(SourceInfo.COMMIT2_SCN_KEY, sourceInfo.getCommitScn()); } return ret; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java index 789d43881..7dc139d1f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java @@ -17,12 +17,14 @@ public class SourceInfo extends BaseSourceInfo { public static final String TXID_KEY = "txId"; public static final String SCN_KEY = "scn"; + public static final String SCN2_KEY = "scn2"; public static final String COMMIT_SCN_KEY = "commit_scn"; + public static final String COMMIT2_SCN_KEY = "commit2_scn"; public static final String LCR_POSITION_KEY = "lcr_position"; public static final String SNAPSHOT_KEY = "snapshot"; - private long scn; - private Long commitScn; + private String scn; + private String commitScn; private LcrPosition lcrPosition; private String transactionId; private Instant sourceTime; @@ -32,19 +34,19 @@ protected SourceInfo(OracleConnectorConfig connectorConfig) { super(connectorConfig); } - public long getScn() { + public String getScn() { return scn; } - public Long getCommitScn() { + public String getCommitScn() { return commitScn; } - public void setScn(long scn) { + public void setScn(String scn) { this.scn = scn; } - public void setCommitScn(Long commitScn) { + public void setCommitScn(String commitScn) { this.commitScn = commitScn; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java index 056f5522a..fc231cc5f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerHelper.java @@ -518,7 +518,7 @@ public static void setRedoLogFilesForMining(OracleConnection connection, Scn las * @param transactionRetention duration to tolerate long running transactions * @return optional SCN as a watermark for abandonment */ - public static Optional getLastScnToAbandon(OracleConnection connection, Long offsetScn, Duration transactionRetention) { + public static Optional getLastScnToAbandon(OracleConnection connection, Scn offsetScn, Duration transactionRetention) { try { String query = SqlUtils.diffInDaysQuery(offsetScn); Float diffInDays = (Float) getSingleResult(connection, query, DATATYPE.FLOAT); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 674b0b2b9..27b3bbee5 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -53,8 +53,8 @@ class LogMinerQueryResultProcessor { private final OracleConnectorConfig connectorConfig; private final Clock clock; private final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class); - private long currentOffsetScn = 0; - private long currentOffsetCommitScn = 0; + private Scn currentOffsetScn = Scn.ZERO; + private Scn currentOffsetCommitScn = Scn.ZERO; private long stuckScnCounter = 0; private HistoryRecorder historyRecorder; @@ -208,14 +208,14 @@ int processResult(ResultSet resultSet) { transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), (timestamp, smallestScn, commitScn, counter) -> { // update SCN in offset context only if processed SCN less than SCN among other transactions if (smallestScn == null || scn.compareTo(smallestScn) < 0) { - offsetContext.setScn(scn.longValue()); + offsetContext.setScn(scn); transactionalBufferMetrics.setOldestScn(scn); } offsetContext.setTransactionId(txId); offsetContext.setSourceTime(timestamp.toInstant()); offsetContext.setTableId(tableId); if (counter == 0) { - offsetContext.setCommitScn(commitScn.longValue()); + offsetContext.setCommitScn(commitScn); } Table table = schema.tableFor(tableId); LOGGER.trace("Processing DML event {} scn {}", dmlEntry.toString(), scn); @@ -237,9 +237,9 @@ int processResult(ResultSet resultSet) { metrics.setLastDurationOfBatchProcessing(totalTime); warnStuckScn(); - currentOffsetScn = offsetContext.getScn(); + currentOffsetScn = Scn.valueOf(offsetContext.getScn()); if (offsetContext.getCommitScn() != null) { - currentOffsetCommitScn = offsetContext.getCommitScn(); + currentOffsetCommitScn = Scn.valueOf(offsetContext.getCommitScn()); } } @@ -260,7 +260,9 @@ int processResult(ResultSet resultSet) { */ private void warnStuckScn() { if (offsetContext != null && offsetContext.getCommitScn() != null) { - if (currentOffsetScn == offsetContext.getScn() && currentOffsetCommitScn != offsetContext.getCommitScn()) { + final Scn scn = Scn.valueOf(offsetContext.getScn()); + final Scn commitScn = Scn.valueOf(offsetContext.getCommitScn()); + if (currentOffsetScn.equals(scn) && !currentOffsetCommitScn.equals(offsetContext.getCommitScn())) { stuckScnCounter++; // logWarn only once if (stuckScnCounter == 25) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index 5c7f469e4..49495b33d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -185,7 +185,7 @@ public void execute(ChangeEventSourceContext context) { if (transactionalBuffer.isEmpty()) { LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn); - offsetContext.setScn(startScn.longValue()); + offsetContext.setScn(startScn); } } @@ -228,7 +228,8 @@ private void unregisterLogMinerMetrics() { private void abandonOldTransactionsIfExist(OracleConnection connection, TransactionalBuffer transactionalBuffer) { Duration transactionRetention = connectorConfig.getLogMiningTransactionRetention(); if (!Duration.ZERO.equals(transactionRetention)) { - Optional lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetContext.getScn(), transactionRetention); + final Scn offsetScn = Scn.valueOf(offsetContext.getScn()); + Optional lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetScn, transactionRetention); lastScnToAbandonTransactions.ifPresent(thresholdScn -> { transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext); offsetContext.setScn(thresholdScn); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java index a88f6933c..9a9d88b36 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/SqlUtils.java @@ -382,11 +382,11 @@ static String bulkHistoryInsertStmt(String currentHistoryTableName) { /** * This method return query which converts given SCN in days and deduct from the current day */ - public static String diffInDaysQuery(Long scn) { + public static String diffInDaysQuery(Scn scn) { if (scn == null) { return null; } - return "select sysdate - CAST(scn_to_timestamp(" + scn + ") as date) from dual"; + return "select sysdate - CAST(scn_to_timestamp(" + scn.toString() + ") as date) from dual"; } public static boolean connectionProblem(Throwable e) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 746a90938..85ee577d2 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -138,7 +138,7 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, // On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed. // Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications - if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn() > scn.longValue()) || lastCommittedScn.longValue() > scn.longValue()) { + if ((offsetContext.getCommitScn() != null && Scn.valueOf(offsetContext.getCommitScn()).compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) { LogMinerHelper.logWarn(metrics, "Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}", transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn); @@ -224,9 +224,9 @@ boolean rollback(String transactionId, String debugMessage) { * @param thresholdScn the smallest SVN of any transaction to keep in the buffer. All others will be removed. * @param offsetContext the offset context */ - void abandonLongTransactions(Long thresholdScn, OracleOffsetContext offsetContext) { + void abandonLongTransactions(Scn thresholdScn, OracleOffsetContext offsetContext) { LogMinerHelper.logWarn(metrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn()); - Scn threshold = Scn.valueOf(thresholdScn); + Scn threshold = Scn.valueOf(thresholdScn.toString()); Scn smallestScn = calculateSmallestScn(); if (smallestScn == null) { // no transactions in the buffer diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrPosition.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrPosition.java index 4a73d3c1f..56f0c232e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrPosition.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/LcrPosition.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.oracle.logminer.Scn; import io.debezium.util.HexConverter; import io.debezium.util.Strings; @@ -28,13 +29,13 @@ public class LcrPosition implements Comparable { private final byte[] rawPosition; private final String stringFromat; - private final long scn; + private final Scn scn; public LcrPosition(byte[] rawPosition) { this.rawPosition = rawPosition; this.stringFromat = HexConverter.convertToHexString(rawPosition); try { - scn = XStreamUtility.getSCNFromPosition(rawPosition).longValue(); + scn = new Scn(XStreamUtility.getSCNFromPosition(rawPosition).bigIntegerValue()); } catch (SQLException | StreamsException e) { throw new RuntimeException(e); @@ -53,7 +54,7 @@ public byte[] getRawPosition() { return rawPosition; } - public long getScn() { + public Scn getScn() { return scn; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java index 64067c04d..df9913afc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.oracle.xstream; +import java.sql.SQLException; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -109,18 +110,18 @@ public void commitOffset(Map offset) { if (xsOut != null) { LOGGER.debug("Sending message to request recording of offsets to Oracle"); final LcrPosition lcrPosition = LcrPosition.valueOf((String) offset.get(SourceInfo.LCR_POSITION_KEY)); - final Long scn = (Long) offset.get(SourceInfo.SCN_KEY); + final String scn = (String) offset.get(SourceInfo.SCN2_KEY); // We can safely overwrite the message even if it was not processed. The watermarked will be set to the highest // (last) delivered value in a single step instead of incrementally sendPublishedPosition(lcrPosition, scn); } } - private byte[] convertScnToPosition(long scn) { + private byte[] convertScnToPosition(String scn) { try { - return XStreamUtility.convertSCNToPosition(new NUMBER(scn), this.posVersion); + return XStreamUtility.convertSCNToPosition(new NUMBER(scn, 0), this.posVersion); } - catch (StreamsException e) { + catch (SQLException | StreamsException e) { throw new RuntimeException(e); } } @@ -139,7 +140,7 @@ XStreamOut getXsOut() { return xsOut; } - private void sendPublishedPosition(final LcrPosition lcrPosition, final Long scn) { + private void sendPublishedPosition(final LcrPosition lcrPosition, final String scn) { lcrMessage.set(new PositionAndScn(lcrPosition, (scn != null) ? convertScnToPosition(scn) : null)); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java new file mode 100644 index 000000000..617a6177f --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java @@ -0,0 +1,86 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import static org.fest.assertions.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; + +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.doc.FixFor; + +/** + * Unit test that validates the behavior of the {@link OracleOffsetContext} and its friends. + * + * @author Chris Cranford + */ +public class OracleOffsetContextTest { + + private OracleConnectorConfig connectorConfig; + private OracleOffsetContext.Loader offsetLoader; + + @Before + public void beforeEach() throws Exception { + this.connectorConfig = new OracleConnectorConfig(TestHelper.defaultConfig().build()); + this.offsetLoader = new OracleOffsetContext.Loader(connectorConfig, TestHelper.adapter()); + } + + @Test + @FixFor("DBZ-2994") + public void shouldReadLegacyScnAndCommitScnFieldsWhenNewNotProvided() throws Exception { + final Map offsetValues = new HashMap<>(); + offsetValues.put(SourceInfo.SCN_KEY, 12345L); + offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L); + + final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); + assertThat(offsetContext.getScn()).isEqualTo("12345"); + assertThat(offsetContext.getCommitScn()).isEqualTo("23456"); + } + + @Test + @FixFor("DBZ-2994") + public void shouldReadNewScnAndCommitScnFieldsWhenLegacyNotProvided() throws Exception { + final Map offsetValues = new HashMap<>(); + offsetValues.put(SourceInfo.SCN2_KEY, "12345"); + offsetValues.put(SourceInfo.COMMIT2_SCN_KEY, "23456"); + + final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); + assertThat(offsetContext.getScn()).isEqualTo("12345"); + assertThat(offsetContext.getCommitScn()).isEqualTo("23456"); + } + + @Test + @FixFor("DBZ-2994") + public void shouldPrioritizeNewScnAndCommitScnFields() throws Exception { + final Map offsetValues = new HashMap<>(); + offsetValues.put(SourceInfo.SCN_KEY, 12345L); + offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L); + offsetValues.put(SourceInfo.SCN2_KEY, "345678"); + offsetValues.put(SourceInfo.COMMIT2_SCN_KEY, "456789"); + + final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); + assertThat(offsetContext.getScn()).isEqualTo("345678"); + assertThat(offsetContext.getCommitScn()).isEqualTo("456789"); + } + + @Test + @FixFor("DBZ-2994") + public void shouldHandleNullScnAndCommitScnValues() throws Exception { + final Map offsetValues = new HashMap<>(); + offsetValues.put(SourceInfo.SCN_KEY, null); + offsetValues.put(SourceInfo.COMMIT_SCN_KEY, null); + offsetValues.put(SourceInfo.SCN2_KEY, null); + offsetValues.put(SourceInfo.COMMIT2_SCN_KEY, null); + + final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); + assertThat(offsetContext.getScn()).isNull(); + assertThat(offsetContext.getCommitScn()).isNull(); + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java index 79d3449c6..d813dc29b 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java @@ -61,6 +61,8 @@ public void schemaIsCorrect() { .field("scn", Schema.OPTIONAL_INT64_SCHEMA) .field("commit_scn", Schema.OPTIONAL_INT64_SCHEMA) .field("lcr_position", Schema.OPTIONAL_STRING_SCHEMA) + .field("scn2", Schema.OPTIONAL_STRING_SCHEMA) + .field("commit2_scn", Schema.OPTIONAL_STRING_SCHEMA) .build(); VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java index 14b92d27c..e583268d2 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/SqlUtilsTest.java @@ -202,7 +202,7 @@ public void testStatements() { expected = "TRUNCATE TABLE table_name"; assertThat(result).isEqualTo(expected); - result = SqlUtils.diffInDaysQuery(123L); + result = SqlUtils.diffInDaysQuery(Scn.valueOf(123L)); expected = "select sysdate - CAST(scn_to_timestamp(123) as date) from dual"; assertThat(expected.equals(result)).isTrue(); result = SqlUtils.diffInDaysQuery(null); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index 41e80cc6e..34e63895b 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -122,7 +122,7 @@ public void testIsNotEmptyWhenTransactionIsRegistered() { public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1); transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown()); - offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext()); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), (LcrPosition) null, false, true, new TransactionContext()); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); commitLatch.await(); Thread.sleep(1000); @@ -169,7 +169,7 @@ public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedExcep smallestScnContainer.set(smallestScn); commitLatch.countDown(); }); - offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext()); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), null, false, true, new TransactionContext()); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); commitLatch.await(); @@ -188,7 +188,7 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Interrupted }); transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> { }); - offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext()); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), null, false, true, new TransactionContext()); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); commitLatch.await(); // after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet @@ -207,7 +207,7 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte smallestScnContainer.set(smallestScn); commitLatch.countDown(); }); - offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), OTHER_SCN.longValue(), null, false, true, new TransactionContext()); + offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.toString(), OTHER_SCN.toString(), null, false, true, new TransactionContext()); transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); commitLatch.await(); assertThat(smallestScnContainer.get()).isEqualTo(SCN); @@ -219,8 +219,8 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte public void testAbandoningOneTransaction() { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> { }); - offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext()); - transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), (LcrPosition) null, false, true, new TransactionContext()); + transactionalBuffer.abandonLongTransactions(SCN, offsetContext); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); } @@ -230,7 +230,7 @@ public void testAbandoningTransactionHavingAnotherOne() { }); transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> { }); - transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext); + transactionalBuffer.abandonLongTransactions(SCN, offsetContext); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); } @@ -248,7 +248,7 @@ public void testTransactionDump() { private void commitTransaction(TransactionalBuffer.CommitCallback commitCallback) { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), commitCallback); - offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext()); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), null, false, true, new TransactionContext()); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); } }