From da02ca2a234564b649c4484b0f767e464cf66813 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 17 Mar 2021 05:12:38 -0400 Subject: [PATCH] DBZ-2994 Suggested changes --- .../oracle/OracleConnectorConfig.java | 5 +++-- .../connector/oracle/OracleOffsetContext.java | 13 +++++++----- .../OracleSnapshotChangeEventSource.java | 2 +- .../oracle/OracleSourceInfoStructMaker.java | 9 ++++++--- .../debezium/connector/oracle/SourceInfo.java | 20 ++++++------------- .../LogMinerQueryResultProcessor.java | 12 +++++------ .../LogMinerStreamingChangeEventSource.java | 4 ++-- .../oracle/logminer/TransactionalBuffer.java | 4 ++-- .../XstreamStreamingChangeEventSource.java | 6 +++--- .../oracle/OracleOffsetContextTest.java | 8 ++++---- 10 files changed, 41 insertions(+), 42 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index c2c9e8229..56635513b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -472,11 +472,12 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { private Scn resolveScn(Document document) { // prioritize reading scn as string and if not found, fallback to long data types - if (document.getString(SourceInfo.SCN_KEY) == null) { + final String scn = document.getString(SourceInfo.SCN_KEY); + if (scn == null) { Long scnValue = document.getLong(SourceInfo.SCN_KEY); Scn.valueOf(scnValue == null ? 0 : scnValue); } - return Scn.valueOf(document.getString(SourceInfo.SCN_KEY)); + return Scn.valueOf(scn); } }; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 6d8ed38e6..3f0eddf42 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -120,7 +120,8 @@ public static Builder create() { if (sourceInfo.isSnapshot()) { Map offset = new HashMap<>(); - offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); + final Scn scn = sourceInfo.getScn(); + offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : scn); offset.put(SourceInfo.SNAPSHOT_KEY, true); offset.put(SNAPSHOT_COMPLETED_KEY, snapshotCompleted); @@ -132,8 +133,10 @@ public static Builder create() { offset.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); } else { - offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); - offset.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); + final Scn scn = sourceInfo.getScn(); + final Scn commitScn = sourceInfo.getCommitScn(); + offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : null); + offset.put(SourceInfo.COMMIT_SCN_KEY, commitScn != null ? commitScn.toString() : null); } return transactionContext.store(offset); } @@ -157,11 +160,11 @@ public void setCommitScn(Scn commitScn) { sourceInfo.setCommitScn(commitScn); } - public String getScn() { + public Scn getScn() { return sourceInfo.getScn(); } - public String getCommitScn() { + public Scn getCommitScn() { return sourceInfo.getCommitScn(); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index dac02abf4..cf36dcdc3 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -280,7 +280,7 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapsh @Override protected Optional getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) { final OracleOffsetContext offset = (OracleOffsetContext) snapshotContext.offset; - final String snapshotOffset = offset.getScn(); + final String snapshotOffset = offset.getScn().toString(); assert snapshotOffset != null; return Optional.of("SELECT * FROM " + quote(tableId) + " AS OF SCN " + snapshotOffset); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java index fca6e4da9..b1cf400d0 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java @@ -35,17 +35,20 @@ public Schema schema() { @Override public Struct struct(SourceInfo sourceInfo) { + final String scn = sourceInfo.getScn() == null ? null : sourceInfo.getScn().toString(); + final String commitScn = sourceInfo.getCommitScn() == null ? null : sourceInfo.getCommitScn().toString(); + final Struct ret = super.commonStruct(sourceInfo) .put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema()) .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()) .put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId()) - .put(SourceInfo.SCN_KEY, sourceInfo.getScn()); + .put(SourceInfo.SCN_KEY, scn); if (sourceInfo.getLcrPosition() != null) { ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); } - if (sourceInfo.getCommitScn() != null) { - ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); + if (commitScn != null) { + ret.put(SourceInfo.COMMIT_SCN_KEY, commitScn); } return ret; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java index 6619ca32c..cbfb23eae 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java @@ -21,8 +21,8 @@ public class SourceInfo extends BaseSourceInfo { public static final String LCR_POSITION_KEY = "lcr_position"; public static final String SNAPSHOT_KEY = "snapshot"; - private String scn; - private String commitScn; + private Scn scn; + private Scn commitScn; private LcrPosition lcrPosition; private String transactionId; private Instant sourceTime; @@ -32,28 +32,20 @@ protected SourceInfo(OracleConnectorConfig connectorConfig) { super(connectorConfig); } - public String getScn() { + public Scn getScn() { return scn; } - public String getCommitScn() { + public Scn getCommitScn() { return commitScn; } public void setScn(Scn scn) { - if (scn == null) { - this.scn = null; - return; - } - this.scn = scn.toString(); + this.scn = scn; } public void setCommitScn(Scn commitScn) { - if (commitScn == null) { - this.commitScn = null; - return; - } - this.commitScn = commitScn.toString(); + this.commitScn = commitScn; } public LcrPosition getLcrPosition() { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 9c97bc411..294636003 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -238,9 +238,9 @@ int processResult(ResultSet resultSet) { metrics.setLastDurationOfBatchProcessing(totalTime); warnStuckScn(); - currentOffsetScn = Scn.valueOf(offsetContext.getScn()); + currentOffsetScn = offsetContext.getScn(); if (offsetContext.getCommitScn() != null) { - currentOffsetCommitScn = Scn.valueOf(offsetContext.getCommitScn()); + currentOffsetCommitScn = offsetContext.getCommitScn(); } } @@ -261,16 +261,16 @@ int processResult(ResultSet resultSet) { */ private void warnStuckScn() { if (offsetContext != null && offsetContext.getCommitScn() != null) { - final Scn scn = Scn.valueOf(offsetContext.getScn()); - final Scn commitScn = Scn.valueOf(offsetContext.getCommitScn()); - if (currentOffsetScn.equals(scn) && !currentOffsetCommitScn.equals(offsetContext.getCommitScn())) { + final Scn scn = offsetContext.getScn(); + final Scn commitScn = offsetContext.getCommitScn(); + if (currentOffsetScn.equals(scn) && !currentOffsetCommitScn.equals(commitScn)) { stuckScnCounter++; // logWarn only once if (stuckScnCounter == 25) { LogMinerHelper.logWarn(transactionalBufferMetrics, "Offset SCN {} is not changing. It indicates long transaction(s). " + "Offset commit SCN: {}", - currentOffsetScn, offsetContext.getCommitScn()); + currentOffsetScn, commitScn); transactionalBufferMetrics.incrementScnFreezeCounter(); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index 3bd5b997c..de25931f5 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -119,7 +119,7 @@ public void execute(ChangeEventSourceContext context) { LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs); transactionalBuffer.setDatabaseTimeDifference(databaseTimeMs); - startScn = Scn.valueOf(offsetContext.getScn()); + startScn = offsetContext.getScn(); createFlushTable(jdbcConnection); if (!isContinuousMining && startScn.compareTo(getFirstOnlineLogScn(jdbcConnection, archiveLogRetention)) < 0) { @@ -229,7 +229,7 @@ private void unregisterLogMinerMetrics() { private void abandonOldTransactionsIfExist(OracleConnection connection, TransactionalBuffer transactionalBuffer) { Duration transactionRetention = connectorConfig.getLogMiningTransactionRetention(); if (!Duration.ZERO.equals(transactionRetention)) { - final Scn offsetScn = Scn.valueOf(offsetContext.getScn()); + final Scn offsetScn = offsetContext.getScn(); Optional lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetScn, transactionRetention); lastScnToAbandonTransactions.ifPresent(thresholdScn -> { transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 278395c7a..559b21ba6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -139,7 +139,7 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, // On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed. // Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications - if ((offsetContext.getCommitScn() != null && Scn.valueOf(offsetContext.getCommitScn()).compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) { + if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn().compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) { LogMinerHelper.logWarn(metrics, "Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}", transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn); @@ -183,7 +183,7 @@ private void commit(ChangeEventSource.ChangeEventSourceContext context, OracleOf metrics.setActiveTransactions(transactions.size()); metrics.incrementCommittedDmlCounter(commitCallbacks.size()); metrics.setCommittedScn(scn); - metrics.setOffsetScn(Scn.valueOf(offsetContext.getScn())); + metrics.setOffsetScn(offsetContext.getScn()); metrics.setLastCommitDuration(Duration.between(start, Instant.now()).toMillis()); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java index c877a2221..88398b3b0 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java @@ -118,9 +118,9 @@ public void commitOffset(Map offset) { } } - private byte[] convertScnToPosition(String scn) { + private byte[] convertScnToPosition(Scn scn) { try { - return XStreamUtility.convertSCNToPosition(new NUMBER(scn, 0), this.posVersion); + return XStreamUtility.convertSCNToPosition(new NUMBER(scn.toString(), 0), this.posVersion); } catch (SQLException | StreamsException e) { throw new RuntimeException(e); @@ -142,7 +142,7 @@ XStreamOut getXsOut() { } private void sendPublishedPosition(final LcrPosition lcrPosition, final Scn scn) { - lcrMessage.set(new PositionAndScn(lcrPosition, (scn != null) ? convertScnToPosition(scn.toString()) : null)); + lcrMessage.set(new PositionAndScn(lcrPosition, (scn != null) ? convertScnToPosition(scn) : null)); } PositionAndScn receivePublishedPosition() { diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java index de32da421..ca19a43ac 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleOffsetContextTest.java @@ -40,8 +40,8 @@ public void shouldreadScnAndCommitScnAsLongValues() throws Exception { offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L); final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); - assertThat(offsetContext.getScn()).isEqualTo("12345"); - assertThat(offsetContext.getCommitScn()).isEqualTo("23456"); + assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345")); + assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456")); } @Test @@ -52,8 +52,8 @@ public void shouldReadScnAndCommitScnAsStringValues() throws Exception { offsetValues.put(SourceInfo.COMMIT_SCN_KEY, "23456"); final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); - assertThat(offsetContext.getScn()).isEqualTo("12345"); - assertThat(offsetContext.getCommitScn()).isEqualTo("23456"); + assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345")); + assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456")); } @Test