DBZ-2994 Suggested changes

This commit is contained in:
Chris Cranford 2021-03-17 05:12:38 -04:00 committed by Gunnar Morling
parent f3412c02da
commit da02ca2a23
10 changed files with 41 additions and 42 deletions

View File

@ -472,11 +472,12 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
private Scn resolveScn(Document document) { private Scn resolveScn(Document document) {
// prioritize reading scn as string and if not found, fallback to long data types // prioritize reading scn as string and if not found, fallback to long data types
if (document.getString(SourceInfo.SCN_KEY) == null) { final String scn = document.getString(SourceInfo.SCN_KEY);
if (scn == null) {
Long scnValue = document.getLong(SourceInfo.SCN_KEY); Long scnValue = document.getLong(SourceInfo.SCN_KEY);
Scn.valueOf(scnValue == null ? 0 : scnValue); Scn.valueOf(scnValue == null ? 0 : scnValue);
} }
return Scn.valueOf(document.getString(SourceInfo.SCN_KEY)); return Scn.valueOf(scn);
} }
}; };
} }

View File

@ -120,7 +120,8 @@ public static Builder create() {
if (sourceInfo.isSnapshot()) { if (sourceInfo.isSnapshot()) {
Map<String, Object> offset = new HashMap<>(); Map<String, Object> offset = new HashMap<>();
offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); final Scn scn = sourceInfo.getScn();
offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : scn);
offset.put(SourceInfo.SNAPSHOT_KEY, true); offset.put(SourceInfo.SNAPSHOT_KEY, true);
offset.put(SNAPSHOT_COMPLETED_KEY, snapshotCompleted); offset.put(SNAPSHOT_COMPLETED_KEY, snapshotCompleted);
@ -132,8 +133,10 @@ public static Builder create() {
offset.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); offset.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString());
} }
else { else {
offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); final Scn scn = sourceInfo.getScn();
offset.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); final Scn commitScn = sourceInfo.getCommitScn();
offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : null);
offset.put(SourceInfo.COMMIT_SCN_KEY, commitScn != null ? commitScn.toString() : null);
} }
return transactionContext.store(offset); return transactionContext.store(offset);
} }
@ -157,11 +160,11 @@ public void setCommitScn(Scn commitScn) {
sourceInfo.setCommitScn(commitScn); sourceInfo.setCommitScn(commitScn);
} }
public String getScn() { public Scn getScn() {
return sourceInfo.getScn(); return sourceInfo.getScn();
} }
public String getCommitScn() { public Scn getCommitScn() {
return sourceInfo.getCommitScn(); return sourceInfo.getCommitScn();
} }

View File

@ -280,7 +280,7 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapsh
@Override @Override
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) { protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) {
final OracleOffsetContext offset = (OracleOffsetContext) snapshotContext.offset; final OracleOffsetContext offset = (OracleOffsetContext) snapshotContext.offset;
final String snapshotOffset = offset.getScn(); final String snapshotOffset = offset.getScn().toString();
assert snapshotOffset != null; assert snapshotOffset != null;
return Optional.of("SELECT * FROM " + quote(tableId) + " AS OF SCN " + snapshotOffset); return Optional.of("SELECT * FROM " + quote(tableId) + " AS OF SCN " + snapshotOffset);
} }

View File

@ -35,17 +35,20 @@ public Schema schema() {
@Override @Override
public Struct struct(SourceInfo sourceInfo) { public Struct struct(SourceInfo sourceInfo) {
final String scn = sourceInfo.getScn() == null ? null : sourceInfo.getScn().toString();
final String commitScn = sourceInfo.getCommitScn() == null ? null : sourceInfo.getCommitScn().toString();
final Struct ret = super.commonStruct(sourceInfo) final Struct ret = super.commonStruct(sourceInfo)
.put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema()) .put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema())
.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()) .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table())
.put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId()) .put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId())
.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); .put(SourceInfo.SCN_KEY, scn);
if (sourceInfo.getLcrPosition() != null) { if (sourceInfo.getLcrPosition() != null) {
ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString());
} }
if (sourceInfo.getCommitScn() != null) { if (commitScn != null) {
ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn()); ret.put(SourceInfo.COMMIT_SCN_KEY, commitScn);
} }
return ret; return ret;
} }

View File

@ -21,8 +21,8 @@ public class SourceInfo extends BaseSourceInfo {
public static final String LCR_POSITION_KEY = "lcr_position"; public static final String LCR_POSITION_KEY = "lcr_position";
public static final String SNAPSHOT_KEY = "snapshot"; public static final String SNAPSHOT_KEY = "snapshot";
private String scn; private Scn scn;
private String commitScn; private Scn commitScn;
private LcrPosition lcrPosition; private LcrPosition lcrPosition;
private String transactionId; private String transactionId;
private Instant sourceTime; private Instant sourceTime;
@ -32,28 +32,20 @@ protected SourceInfo(OracleConnectorConfig connectorConfig) {
super(connectorConfig); super(connectorConfig);
} }
public String getScn() { public Scn getScn() {
return scn; return scn;
} }
public String getCommitScn() { public Scn getCommitScn() {
return commitScn; return commitScn;
} }
public void setScn(Scn scn) { public void setScn(Scn scn) {
if (scn == null) { this.scn = scn;
this.scn = null;
return;
}
this.scn = scn.toString();
} }
public void setCommitScn(Scn commitScn) { public void setCommitScn(Scn commitScn) {
if (commitScn == null) { this.commitScn = commitScn;
this.commitScn = null;
return;
}
this.commitScn = commitScn.toString();
} }
public LcrPosition getLcrPosition() { public LcrPosition getLcrPosition() {

View File

@ -238,9 +238,9 @@ int processResult(ResultSet resultSet) {
metrics.setLastDurationOfBatchProcessing(totalTime); metrics.setLastDurationOfBatchProcessing(totalTime);
warnStuckScn(); warnStuckScn();
currentOffsetScn = Scn.valueOf(offsetContext.getScn()); currentOffsetScn = offsetContext.getScn();
if (offsetContext.getCommitScn() != null) { if (offsetContext.getCommitScn() != null) {
currentOffsetCommitScn = Scn.valueOf(offsetContext.getCommitScn()); currentOffsetCommitScn = offsetContext.getCommitScn();
} }
} }
@ -261,16 +261,16 @@ int processResult(ResultSet resultSet) {
*/ */
private void warnStuckScn() { private void warnStuckScn() {
if (offsetContext != null && offsetContext.getCommitScn() != null) { if (offsetContext != null && offsetContext.getCommitScn() != null) {
final Scn scn = Scn.valueOf(offsetContext.getScn()); final Scn scn = offsetContext.getScn();
final Scn commitScn = Scn.valueOf(offsetContext.getCommitScn()); final Scn commitScn = offsetContext.getCommitScn();
if (currentOffsetScn.equals(scn) && !currentOffsetCommitScn.equals(offsetContext.getCommitScn())) { if (currentOffsetScn.equals(scn) && !currentOffsetCommitScn.equals(commitScn)) {
stuckScnCounter++; stuckScnCounter++;
// logWarn only once // logWarn only once
if (stuckScnCounter == 25) { if (stuckScnCounter == 25) {
LogMinerHelper.logWarn(transactionalBufferMetrics, LogMinerHelper.logWarn(transactionalBufferMetrics,
"Offset SCN {} is not changing. It indicates long transaction(s). " + "Offset SCN {} is not changing. It indicates long transaction(s). " +
"Offset commit SCN: {}", "Offset commit SCN: {}",
currentOffsetScn, offsetContext.getCommitScn()); currentOffsetScn, commitScn);
transactionalBufferMetrics.incrementScnFreezeCounter(); transactionalBufferMetrics.incrementScnFreezeCounter();
} }
} }

View File

@ -119,7 +119,7 @@ public void execute(ChangeEventSourceContext context) {
LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs); LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs);
transactionalBuffer.setDatabaseTimeDifference(databaseTimeMs); transactionalBuffer.setDatabaseTimeDifference(databaseTimeMs);
startScn = Scn.valueOf(offsetContext.getScn()); startScn = offsetContext.getScn();
createFlushTable(jdbcConnection); createFlushTable(jdbcConnection);
if (!isContinuousMining && startScn.compareTo(getFirstOnlineLogScn(jdbcConnection, archiveLogRetention)) < 0) { if (!isContinuousMining && startScn.compareTo(getFirstOnlineLogScn(jdbcConnection, archiveLogRetention)) < 0) {
@ -229,7 +229,7 @@ private void unregisterLogMinerMetrics() {
private void abandonOldTransactionsIfExist(OracleConnection connection, TransactionalBuffer transactionalBuffer) { private void abandonOldTransactionsIfExist(OracleConnection connection, TransactionalBuffer transactionalBuffer) {
Duration transactionRetention = connectorConfig.getLogMiningTransactionRetention(); Duration transactionRetention = connectorConfig.getLogMiningTransactionRetention();
if (!Duration.ZERO.equals(transactionRetention)) { if (!Duration.ZERO.equals(transactionRetention)) {
final Scn offsetScn = Scn.valueOf(offsetContext.getScn()); final Scn offsetScn = offsetContext.getScn();
Optional<Scn> lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetScn, transactionRetention); Optional<Scn> lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetScn, transactionRetention);
lastScnToAbandonTransactions.ifPresent(thresholdScn -> { lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext); transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext);

View File

@ -139,7 +139,7 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext,
// On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed. // On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed.
// Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications // Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications
if ((offsetContext.getCommitScn() != null && Scn.valueOf(offsetContext.getCommitScn()).compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) { if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn().compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) {
LogMinerHelper.logWarn(metrics, LogMinerHelper.logWarn(metrics,
"Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}", "Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}",
transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn); transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn);
@ -183,7 +183,7 @@ private void commit(ChangeEventSource.ChangeEventSourceContext context, OracleOf
metrics.setActiveTransactions(transactions.size()); metrics.setActiveTransactions(transactions.size());
metrics.incrementCommittedDmlCounter(commitCallbacks.size()); metrics.incrementCommittedDmlCounter(commitCallbacks.size());
metrics.setCommittedScn(scn); metrics.setCommittedScn(scn);
metrics.setOffsetScn(Scn.valueOf(offsetContext.getScn())); metrics.setOffsetScn(offsetContext.getScn());
metrics.setLastCommitDuration(Duration.between(start, Instant.now()).toMillis()); metrics.setLastCommitDuration(Duration.between(start, Instant.now()).toMillis());
} }
} }

View File

@ -118,9 +118,9 @@ public void commitOffset(Map<String, ?> offset) {
} }
} }
private byte[] convertScnToPosition(String scn) { private byte[] convertScnToPosition(Scn scn) {
try { try {
return XStreamUtility.convertSCNToPosition(new NUMBER(scn, 0), this.posVersion); return XStreamUtility.convertSCNToPosition(new NUMBER(scn.toString(), 0), this.posVersion);
} }
catch (SQLException | StreamsException e) { catch (SQLException | StreamsException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -142,7 +142,7 @@ XStreamOut getXsOut() {
} }
private void sendPublishedPosition(final LcrPosition lcrPosition, final Scn scn) { private void sendPublishedPosition(final LcrPosition lcrPosition, final Scn scn) {
lcrMessage.set(new PositionAndScn(lcrPosition, (scn != null) ? convertScnToPosition(scn.toString()) : null)); lcrMessage.set(new PositionAndScn(lcrPosition, (scn != null) ? convertScnToPosition(scn) : null));
} }
PositionAndScn receivePublishedPosition() { PositionAndScn receivePublishedPosition() {

View File

@ -40,8 +40,8 @@ public void shouldreadScnAndCommitScnAsLongValues() throws Exception {
offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L); offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L);
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
assertThat(offsetContext.getScn()).isEqualTo("12345"); assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345"));
assertThat(offsetContext.getCommitScn()).isEqualTo("23456"); assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456"));
} }
@Test @Test
@ -52,8 +52,8 @@ public void shouldReadScnAndCommitScnAsStringValues() throws Exception {
offsetValues.put(SourceInfo.COMMIT_SCN_KEY, "23456"); offsetValues.put(SourceInfo.COMMIT_SCN_KEY, "23456");
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues); final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
assertThat(offsetContext.getScn()).isEqualTo("12345"); assertThat(offsetContext.getScn()).isEqualTo(Scn.valueOf("12345"));
assertThat(offsetContext.getCommitScn()).isEqualTo("23456"); assertThat(offsetContext.getCommitScn()).isEqualTo(Scn.valueOf("23456"));
} }
@Test @Test