DBZ-2994 Use string-representation of SCN in offsets & source info
This commit is contained in:
parent
769f8dc4f4
commit
a9d59710a7
@ -29,6 +29,7 @@
|
||||
import io.debezium.connector.SourceInfoStructMaker;
|
||||
import io.debezium.connector.oracle.logminer.HistoryRecorder;
|
||||
import io.debezium.connector.oracle.logminer.NeverHistoryRecorder;
|
||||
import io.debezium.connector.oracle.logminer.Scn;
|
||||
import io.debezium.connector.oracle.logminer.SqlUtils;
|
||||
import io.debezium.connector.oracle.xstream.LcrPosition;
|
||||
import io.debezium.connector.oracle.xstream.OracleVersion;
|
||||
@ -451,24 +452,28 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
|
||||
return new HistoryRecordComparator() {
|
||||
@Override
|
||||
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
|
||||
Long recordedScn;
|
||||
Long desiredScn;
|
||||
Scn recordedScn;
|
||||
Scn desiredScn;
|
||||
if (getAdapter() == OracleConnectorConfig.ConnectorAdapter.XSTREAM) {
|
||||
final LcrPosition recordedPosition = LcrPosition.valueOf(recorded.getString(SourceInfo.LCR_POSITION_KEY));
|
||||
final LcrPosition desiredPosition = LcrPosition.valueOf(desired.getString(SourceInfo.LCR_POSITION_KEY));
|
||||
recordedScn = recordedPosition != null ? recordedPosition.getScn() : recorded.getLong(SourceInfo.SCN_KEY);
|
||||
desiredScn = desiredPosition != null ? desiredPosition.getScn() : desired.getLong(SourceInfo.SCN_KEY);
|
||||
recordedScn = recordedPosition != null ? recordedPosition.getScn() : getScnFromString(recorded.getString(SourceInfo.SCN2_KEY));
|
||||
desiredScn = desiredPosition != null ? desiredPosition.getScn() : getScnFromString(desired.getString(SourceInfo.SCN2_KEY));
|
||||
return (recordedPosition != null && desiredPosition != null)
|
||||
? recordedPosition.compareTo(desiredPosition) < 1
|
||||
: recordedScn.compareTo(desiredScn) < 1;
|
||||
}
|
||||
else {
|
||||
recordedScn = recorded.getLong(SourceInfo.SCN_KEY);
|
||||
desiredScn = desired.getLong(SourceInfo.SCN_KEY);
|
||||
recordedScn = getScnFromString(recorded.getString(SourceInfo.SCN_KEY));
|
||||
desiredScn = getScnFromString(desired.getString(SourceInfo.SCN_KEY));
|
||||
return recordedScn.compareTo(desiredScn) < 1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Scn getScnFromString(String value) {
|
||||
return value == null ? Scn.ZERO : Scn.valueOf(value);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import io.debezium.connector.SnapshotRecord;
|
||||
import io.debezium.connector.oracle.logminer.Scn;
|
||||
import io.debezium.connector.oracle.xstream.LcrPosition;
|
||||
import io.debezium.pipeline.spi.OffsetContext;
|
||||
import io.debezium.pipeline.txmetadata.TransactionContext;
|
||||
@ -36,13 +37,13 @@ public class OracleOffsetContext implements OffsetContext {
|
||||
*/
|
||||
private boolean snapshotCompleted;
|
||||
|
||||
public OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, Long commitScn, LcrPosition lcrPosition,
|
||||
public OracleOffsetContext(OracleConnectorConfig connectorConfig, String scn, String commitScn, LcrPosition lcrPosition,
|
||||
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) {
|
||||
this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext);
|
||||
sourceInfo.setCommitScn(commitScn);
|
||||
}
|
||||
|
||||
private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, LcrPosition lcrPosition,
|
||||
private OracleOffsetContext(OracleConnectorConfig connectorConfig, String scn, LcrPosition lcrPosition,
|
||||
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) {
|
||||
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
|
||||
|
||||
@ -65,7 +66,7 @@ private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, Lcr
|
||||
public static class Builder {
|
||||
|
||||
private OracleConnectorConfig connectorConfig;
|
||||
private long scn;
|
||||
private String scn;
|
||||
private LcrPosition lcrPosition;
|
||||
private boolean snapshot;
|
||||
private boolean snapshotCompleted;
|
||||
@ -76,11 +77,16 @@ public Builder logicalName(OracleConnectorConfig connectorConfig) {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder scn(long scn) {
|
||||
public Builder scn(String scn) {
|
||||
this.scn = scn;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder scn(Scn scn) {
|
||||
scn(scn != null ? scn.toString() : null);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder lcrPosition(LcrPosition lcrPosition) {
|
||||
this.lcrPosition = lcrPosition;
|
||||
return this;
|
||||
@ -120,7 +126,7 @@ public static Builder create() {
|
||||
if (sourceInfo.isSnapshot()) {
|
||||
Map<String, Object> offset = new HashMap<>();
|
||||
|
||||
offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn());
|
||||
offset.put(SourceInfo.SCN2_KEY, sourceInfo.getScn());
|
||||
offset.put(SourceInfo.SNAPSHOT_KEY, true);
|
||||
offset.put(SNAPSHOT_COMPLETED_KEY, snapshotCompleted);
|
||||
|
||||
@ -132,8 +138,8 @@ public static Builder create() {
|
||||
offset.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString());
|
||||
}
|
||||
else {
|
||||
offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn());
|
||||
offset.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn());
|
||||
offset.put(SourceInfo.SCN2_KEY, sourceInfo.getScn());
|
||||
offset.put(SourceInfo.COMMIT2_SCN_KEY, sourceInfo.getCommitScn());
|
||||
}
|
||||
return transactionContext.store(offset);
|
||||
}
|
||||
@ -149,19 +155,19 @@ public Struct getSourceInfo() {
|
||||
return sourceInfo.struct();
|
||||
}
|
||||
|
||||
public void setScn(long scn) {
|
||||
sourceInfo.setScn(scn);
|
||||
public void setScn(Scn scn) {
|
||||
sourceInfo.setScn(scn.toString());
|
||||
}
|
||||
|
||||
public void setCommitScn(Long commitScn) {
|
||||
sourceInfo.setCommitScn(commitScn);
|
||||
public void setCommitScn(Scn commitScn) {
|
||||
sourceInfo.setCommitScn(commitScn.toString());
|
||||
}
|
||||
|
||||
public long getScn() {
|
||||
public String getScn() {
|
||||
return sourceInfo.getScn();
|
||||
}
|
||||
|
||||
public Long getCommitScn() {
|
||||
public String getCommitScn() {
|
||||
return sourceInfo.getCommitScn();
|
||||
}
|
||||
|
||||
@ -256,18 +262,43 @@ public Loader(OracleConnectorConfig connectorConfig, OracleConnectorConfig.Conne
|
||||
public OffsetContext load(Map<String, ?> offset) {
|
||||
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
|
||||
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY));
|
||||
Long scn;
|
||||
String scn;
|
||||
if (adapter == OracleConnectorConfig.ConnectorAdapter.LOG_MINER) {
|
||||
scn = (Long) offset.get(SourceInfo.SCN_KEY);
|
||||
Long commitScn = (Long) offset.get(SourceInfo.COMMIT_SCN_KEY);
|
||||
scn = getScnFromOffset(offset, null);
|
||||
String commitScn = getCommitScnFromOffset(offset);
|
||||
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset));
|
||||
}
|
||||
else {
|
||||
LcrPosition lcrPosition = LcrPosition.valueOf((String) offset.get(SourceInfo.LCR_POSITION_KEY));
|
||||
scn = lcrPosition != null ? lcrPosition.getScn() : (Long) offset.get(SourceInfo.SCN_KEY);
|
||||
scn = getScnFromOffset(offset, lcrPosition);
|
||||
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String getScnFromOffset(Map<String, ?> offset, LcrPosition lcrPosition) {
|
||||
if (lcrPosition != null) {
|
||||
return lcrPosition.getScn().toString();
|
||||
}
|
||||
// Prioritize string-based SCN key over the numeric-based SCN key
|
||||
if (offset.containsKey(SourceInfo.SCN2_KEY)) {
|
||||
return (String) offset.get(SourceInfo.SCN2_KEY);
|
||||
}
|
||||
else {
|
||||
Long scnAsLong = (Long) offset.get(SourceInfo.SCN_KEY);
|
||||
return scnAsLong != null ? scnAsLong.toString() : null;
|
||||
}
|
||||
}
|
||||
|
||||
private String getCommitScnFromOffset(Map<String, ?> offset) {
|
||||
// Prioritize string-based commit SCN key over the numeric-based commit SCN key
|
||||
if (offset.containsKey(SourceInfo.COMMIT2_SCN_KEY)) {
|
||||
return (String) offset.get(SourceInfo.COMMIT2_SCN_KEY);
|
||||
}
|
||||
else {
|
||||
Long commitScnAsLong = (Long) offset.get(SourceInfo.COMMIT_SCN_KEY);
|
||||
return commitScnAsLong != null ? commitScnAsLong.toString() : null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext ctx) throws Exc
|
||||
|
||||
ctx.offset = OracleOffsetContext.create()
|
||||
.logicalName(connectorConfig)
|
||||
.scn(currentScn.longValue())
|
||||
.scn(currentScn)
|
||||
.transactionContext(new TransactionContext())
|
||||
.build();
|
||||
}
|
||||
@ -280,7 +280,9 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapsh
|
||||
|
||||
@Override
|
||||
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) {
|
||||
long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn");
|
||||
final OracleOffsetContext offset = (OracleOffsetContext) snapshotContext.offset;
|
||||
final String snapshotOffset = offset.getScn();
|
||||
assert snapshotOffset != null;
|
||||
return Optional.of("SELECT * FROM " + quote(tableId) + " AS OF SCN " + snapshotOffset);
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,8 @@ public OracleSourceInfoStructMaker(String connector, String version, CommonConne
|
||||
.field(SourceInfo.SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
|
||||
.field(SourceInfo.COMMIT_SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
|
||||
.field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field(SourceInfo.SCN2_KEY, Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field(SourceInfo.COMMIT2_SCN_KEY, Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -39,13 +41,15 @@ public Struct struct(SourceInfo sourceInfo) {
|
||||
.put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema())
|
||||
.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table())
|
||||
.put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId())
|
||||
.put(SourceInfo.SCN_KEY, sourceInfo.getScn());
|
||||
.put(SourceInfo.SCN_KEY, sourceInfo.getScn() != null ? Long.parseLong(sourceInfo.getScn()) : 0L)
|
||||
.put(SourceInfo.SCN2_KEY, sourceInfo.getScn());
|
||||
|
||||
if (sourceInfo.getLcrPosition() != null) {
|
||||
ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString());
|
||||
}
|
||||
if (sourceInfo.getCommitScn() != null) {
|
||||
ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn());
|
||||
ret.put(SourceInfo.COMMIT_SCN_KEY, sourceInfo.getCommitScn() != null ? Long.parseLong(sourceInfo.getCommitScn()) : 0L);
|
||||
ret.put(SourceInfo.COMMIT2_SCN_KEY, sourceInfo.getCommitScn());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -17,12 +17,14 @@ public class SourceInfo extends BaseSourceInfo {
|
||||
|
||||
public static final String TXID_KEY = "txId";
|
||||
public static final String SCN_KEY = "scn";
|
||||
public static final String SCN2_KEY = "scn2";
|
||||
public static final String COMMIT_SCN_KEY = "commit_scn";
|
||||
public static final String COMMIT2_SCN_KEY = "commit2_scn";
|
||||
public static final String LCR_POSITION_KEY = "lcr_position";
|
||||
public static final String SNAPSHOT_KEY = "snapshot";
|
||||
|
||||
private long scn;
|
||||
private Long commitScn;
|
||||
private String scn;
|
||||
private String commitScn;
|
||||
private LcrPosition lcrPosition;
|
||||
private String transactionId;
|
||||
private Instant sourceTime;
|
||||
@ -32,19 +34,19 @@ protected SourceInfo(OracleConnectorConfig connectorConfig) {
|
||||
super(connectorConfig);
|
||||
}
|
||||
|
||||
public long getScn() {
|
||||
public String getScn() {
|
||||
return scn;
|
||||
}
|
||||
|
||||
public Long getCommitScn() {
|
||||
public String getCommitScn() {
|
||||
return commitScn;
|
||||
}
|
||||
|
||||
public void setScn(long scn) {
|
||||
public void setScn(String scn) {
|
||||
this.scn = scn;
|
||||
}
|
||||
|
||||
public void setCommitScn(Long commitScn) {
|
||||
public void setCommitScn(String commitScn) {
|
||||
this.commitScn = commitScn;
|
||||
}
|
||||
|
||||
|
@ -518,7 +518,7 @@ public static void setRedoLogFilesForMining(OracleConnection connection, Scn las
|
||||
* @param transactionRetention duration to tolerate long running transactions
|
||||
* @return optional SCN as a watermark for abandonment
|
||||
*/
|
||||
public static Optional<Long> getLastScnToAbandon(OracleConnection connection, Long offsetScn, Duration transactionRetention) {
|
||||
public static Optional<Scn> getLastScnToAbandon(OracleConnection connection, Scn offsetScn, Duration transactionRetention) {
|
||||
try {
|
||||
String query = SqlUtils.diffInDaysQuery(offsetScn);
|
||||
Float diffInDays = (Float) getSingleResult(connection, query, DATATYPE.FLOAT);
|
||||
|
@ -53,8 +53,8 @@ class LogMinerQueryResultProcessor {
|
||||
private final OracleConnectorConfig connectorConfig;
|
||||
private final Clock clock;
|
||||
private final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class);
|
||||
private long currentOffsetScn = 0;
|
||||
private long currentOffsetCommitScn = 0;
|
||||
private Scn currentOffsetScn = Scn.ZERO;
|
||||
private Scn currentOffsetCommitScn = Scn.ZERO;
|
||||
private long stuckScnCounter = 0;
|
||||
private HistoryRecorder historyRecorder;
|
||||
|
||||
@ -208,14 +208,14 @@ int processResult(ResultSet resultSet) {
|
||||
transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
// update SCN in offset context only if processed SCN less than SCN among other transactions
|
||||
if (smallestScn == null || scn.compareTo(smallestScn) < 0) {
|
||||
offsetContext.setScn(scn.longValue());
|
||||
offsetContext.setScn(scn);
|
||||
transactionalBufferMetrics.setOldestScn(scn);
|
||||
}
|
||||
offsetContext.setTransactionId(txId);
|
||||
offsetContext.setSourceTime(timestamp.toInstant());
|
||||
offsetContext.setTableId(tableId);
|
||||
if (counter == 0) {
|
||||
offsetContext.setCommitScn(commitScn.longValue());
|
||||
offsetContext.setCommitScn(commitScn);
|
||||
}
|
||||
Table table = schema.tableFor(tableId);
|
||||
LOGGER.trace("Processing DML event {} scn {}", dmlEntry.toString(), scn);
|
||||
@ -237,9 +237,9 @@ int processResult(ResultSet resultSet) {
|
||||
metrics.setLastDurationOfBatchProcessing(totalTime);
|
||||
|
||||
warnStuckScn();
|
||||
currentOffsetScn = offsetContext.getScn();
|
||||
currentOffsetScn = Scn.valueOf(offsetContext.getScn());
|
||||
if (offsetContext.getCommitScn() != null) {
|
||||
currentOffsetCommitScn = offsetContext.getCommitScn();
|
||||
currentOffsetCommitScn = Scn.valueOf(offsetContext.getCommitScn());
|
||||
}
|
||||
}
|
||||
|
||||
@ -260,7 +260,9 @@ int processResult(ResultSet resultSet) {
|
||||
*/
|
||||
private void warnStuckScn() {
|
||||
if (offsetContext != null && offsetContext.getCommitScn() != null) {
|
||||
if (currentOffsetScn == offsetContext.getScn() && currentOffsetCommitScn != offsetContext.getCommitScn()) {
|
||||
final Scn scn = Scn.valueOf(offsetContext.getScn());
|
||||
final Scn commitScn = Scn.valueOf(offsetContext.getCommitScn());
|
||||
if (currentOffsetScn.equals(scn) && !currentOffsetCommitScn.equals(offsetContext.getCommitScn())) {
|
||||
stuckScnCounter++;
|
||||
// logWarn only once
|
||||
if (stuckScnCounter == 25) {
|
||||
|
@ -185,7 +185,7 @@ public void execute(ChangeEventSourceContext context) {
|
||||
|
||||
if (transactionalBuffer.isEmpty()) {
|
||||
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
|
||||
offsetContext.setScn(startScn.longValue());
|
||||
offsetContext.setScn(startScn);
|
||||
}
|
||||
}
|
||||
|
||||
@ -228,7 +228,8 @@ private void unregisterLogMinerMetrics() {
|
||||
private void abandonOldTransactionsIfExist(OracleConnection connection, TransactionalBuffer transactionalBuffer) {
|
||||
Duration transactionRetention = connectorConfig.getLogMiningTransactionRetention();
|
||||
if (!Duration.ZERO.equals(transactionRetention)) {
|
||||
Optional<Long> lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetContext.getScn(), transactionRetention);
|
||||
final Scn offsetScn = Scn.valueOf(offsetContext.getScn());
|
||||
Optional<Scn> lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetScn, transactionRetention);
|
||||
lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
|
||||
transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext);
|
||||
offsetContext.setScn(thresholdScn);
|
||||
|
@ -382,11 +382,11 @@ static String bulkHistoryInsertStmt(String currentHistoryTableName) {
|
||||
/**
|
||||
* This method return query which converts given SCN in days and deduct from the current day
|
||||
*/
|
||||
public static String diffInDaysQuery(Long scn) {
|
||||
public static String diffInDaysQuery(Scn scn) {
|
||||
if (scn == null) {
|
||||
return null;
|
||||
}
|
||||
return "select sysdate - CAST(scn_to_timestamp(" + scn + ") as date) from dual";
|
||||
return "select sysdate - CAST(scn_to_timestamp(" + scn.toString() + ") as date) from dual";
|
||||
}
|
||||
|
||||
public static boolean connectionProblem(Throwable e) {
|
||||
|
@ -138,7 +138,7 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext,
|
||||
|
||||
// On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed.
|
||||
// Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications
|
||||
if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn() > scn.longValue()) || lastCommittedScn.longValue() > scn.longValue()) {
|
||||
if ((offsetContext.getCommitScn() != null && Scn.valueOf(offsetContext.getCommitScn()).compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) {
|
||||
LogMinerHelper.logWarn(metrics,
|
||||
"Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}",
|
||||
transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn);
|
||||
@ -224,9 +224,9 @@ boolean rollback(String transactionId, String debugMessage) {
|
||||
* @param thresholdScn the smallest SVN of any transaction to keep in the buffer. All others will be removed.
|
||||
* @param offsetContext the offset context
|
||||
*/
|
||||
void abandonLongTransactions(Long thresholdScn, OracleOffsetContext offsetContext) {
|
||||
void abandonLongTransactions(Scn thresholdScn, OracleOffsetContext offsetContext) {
|
||||
LogMinerHelper.logWarn(metrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn());
|
||||
Scn threshold = Scn.valueOf(thresholdScn);
|
||||
Scn threshold = Scn.valueOf(thresholdScn.toString());
|
||||
Scn smallestScn = calculateSmallestScn();
|
||||
if (smallestScn == null) {
|
||||
// no transactions in the buffer
|
||||
|
@ -11,6 +11,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.oracle.logminer.Scn;
|
||||
import io.debezium.util.HexConverter;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
@ -28,13 +29,13 @@ public class LcrPosition implements Comparable<LcrPosition> {
|
||||
|
||||
private final byte[] rawPosition;
|
||||
private final String stringFromat;
|
||||
private final long scn;
|
||||
private final Scn scn;
|
||||
|
||||
public LcrPosition(byte[] rawPosition) {
|
||||
this.rawPosition = rawPosition;
|
||||
this.stringFromat = HexConverter.convertToHexString(rawPosition);
|
||||
try {
|
||||
scn = XStreamUtility.getSCNFromPosition(rawPosition).longValue();
|
||||
scn = new Scn(XStreamUtility.getSCNFromPosition(rawPosition).bigIntegerValue());
|
||||
}
|
||||
catch (SQLException | StreamsException e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -53,7 +54,7 @@ public byte[] getRawPosition() {
|
||||
return rawPosition;
|
||||
}
|
||||
|
||||
public long getScn() {
|
||||
public Scn getScn() {
|
||||
return scn;
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.xstream;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ -109,18 +110,18 @@ public void commitOffset(Map<String, ?> offset) {
|
||||
if (xsOut != null) {
|
||||
LOGGER.debug("Sending message to request recording of offsets to Oracle");
|
||||
final LcrPosition lcrPosition = LcrPosition.valueOf((String) offset.get(SourceInfo.LCR_POSITION_KEY));
|
||||
final Long scn = (Long) offset.get(SourceInfo.SCN_KEY);
|
||||
final String scn = (String) offset.get(SourceInfo.SCN2_KEY);
|
||||
// We can safely overwrite the message even if it was not processed. The watermarked will be set to the highest
|
||||
// (last) delivered value in a single step instead of incrementally
|
||||
sendPublishedPosition(lcrPosition, scn);
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] convertScnToPosition(long scn) {
|
||||
private byte[] convertScnToPosition(String scn) {
|
||||
try {
|
||||
return XStreamUtility.convertSCNToPosition(new NUMBER(scn), this.posVersion);
|
||||
return XStreamUtility.convertSCNToPosition(new NUMBER(scn, 0), this.posVersion);
|
||||
}
|
||||
catch (StreamsException e) {
|
||||
catch (SQLException | StreamsException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
@ -139,7 +140,7 @@ XStreamOut getXsOut() {
|
||||
return xsOut;
|
||||
}
|
||||
|
||||
private void sendPublishedPosition(final LcrPosition lcrPosition, final Long scn) {
|
||||
private void sendPublishedPosition(final LcrPosition lcrPosition, final String scn) {
|
||||
lcrMessage.set(new PositionAndScn(lcrPosition, (scn != null) ? convertScnToPosition(scn) : null));
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.oracle;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.connector.oracle.util.TestHelper;
|
||||
import io.debezium.doc.FixFor;
|
||||
|
||||
/**
|
||||
* Unit test that validates the behavior of the {@link OracleOffsetContext} and its friends.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class OracleOffsetContextTest {
|
||||
|
||||
private OracleConnectorConfig connectorConfig;
|
||||
private OracleOffsetContext.Loader offsetLoader;
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
this.connectorConfig = new OracleConnectorConfig(TestHelper.defaultConfig().build());
|
||||
this.offsetLoader = new OracleOffsetContext.Loader(connectorConfig, TestHelper.adapter());
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-2994")
|
||||
public void shouldReadLegacyScnAndCommitScnFieldsWhenNewNotProvided() throws Exception {
|
||||
final Map<String, Object> offsetValues = new HashMap<>();
|
||||
offsetValues.put(SourceInfo.SCN_KEY, 12345L);
|
||||
offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L);
|
||||
|
||||
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
|
||||
assertThat(offsetContext.getScn()).isEqualTo("12345");
|
||||
assertThat(offsetContext.getCommitScn()).isEqualTo("23456");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-2994")
|
||||
public void shouldReadNewScnAndCommitScnFieldsWhenLegacyNotProvided() throws Exception {
|
||||
final Map<String, Object> offsetValues = new HashMap<>();
|
||||
offsetValues.put(SourceInfo.SCN2_KEY, "12345");
|
||||
offsetValues.put(SourceInfo.COMMIT2_SCN_KEY, "23456");
|
||||
|
||||
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
|
||||
assertThat(offsetContext.getScn()).isEqualTo("12345");
|
||||
assertThat(offsetContext.getCommitScn()).isEqualTo("23456");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-2994")
|
||||
public void shouldPrioritizeNewScnAndCommitScnFields() throws Exception {
|
||||
final Map<String, Object> offsetValues = new HashMap<>();
|
||||
offsetValues.put(SourceInfo.SCN_KEY, 12345L);
|
||||
offsetValues.put(SourceInfo.COMMIT_SCN_KEY, 23456L);
|
||||
offsetValues.put(SourceInfo.SCN2_KEY, "345678");
|
||||
offsetValues.put(SourceInfo.COMMIT2_SCN_KEY, "456789");
|
||||
|
||||
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
|
||||
assertThat(offsetContext.getScn()).isEqualTo("345678");
|
||||
assertThat(offsetContext.getCommitScn()).isEqualTo("456789");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-2994")
|
||||
public void shouldHandleNullScnAndCommitScnValues() throws Exception {
|
||||
final Map<String, Object> offsetValues = new HashMap<>();
|
||||
offsetValues.put(SourceInfo.SCN_KEY, null);
|
||||
offsetValues.put(SourceInfo.COMMIT_SCN_KEY, null);
|
||||
offsetValues.put(SourceInfo.SCN2_KEY, null);
|
||||
offsetValues.put(SourceInfo.COMMIT2_SCN_KEY, null);
|
||||
|
||||
final OracleOffsetContext offsetContext = (OracleOffsetContext) offsetLoader.load(offsetValues);
|
||||
assertThat(offsetContext.getScn()).isNull();
|
||||
assertThat(offsetContext.getCommitScn()).isNull();
|
||||
}
|
||||
}
|
@ -61,6 +61,8 @@ public void schemaIsCorrect() {
|
||||
.field("scn", Schema.OPTIONAL_INT64_SCHEMA)
|
||||
.field("commit_scn", Schema.OPTIONAL_INT64_SCHEMA)
|
||||
.field("lcr_position", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("scn2", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("commit2_scn", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.build();
|
||||
|
||||
VerifyRecord.assertConnectSchemasAreEqual(null, source.schema(), schema);
|
||||
|
@ -202,7 +202,7 @@ public void testStatements() {
|
||||
expected = "TRUNCATE TABLE table_name";
|
||||
assertThat(result).isEqualTo(expected);
|
||||
|
||||
result = SqlUtils.diffInDaysQuery(123L);
|
||||
result = SqlUtils.diffInDaysQuery(Scn.valueOf(123L));
|
||||
expected = "select sysdate - CAST(scn_to_timestamp(123) as date) from dual";
|
||||
assertThat(expected.equals(result)).isTrue();
|
||||
result = SqlUtils.diffInDaysQuery(null);
|
||||
|
@ -122,7 +122,7 @@ public void testIsNotEmptyWhenTransactionIsRegistered() {
|
||||
public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException {
|
||||
CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown());
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext());
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), (LcrPosition) null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
|
||||
commitLatch.await();
|
||||
Thread.sleep(1000);
|
||||
@ -169,7 +169,7 @@ public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedExcep
|
||||
smallestScnContainer.set(smallestScn);
|
||||
commitLatch.countDown();
|
||||
});
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
|
||||
commitLatch.await();
|
||||
|
||||
@ -188,7 +188,7 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Interrupted
|
||||
});
|
||||
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
});
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
|
||||
commitLatch.await();
|
||||
// after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet
|
||||
@ -207,7 +207,7 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte
|
||||
smallestScnContainer.set(smallestScn);
|
||||
commitLatch.countDown();
|
||||
});
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), OTHER_SCN.longValue(), null, false, true, new TransactionContext());
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.toString(), OTHER_SCN.toString(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
|
||||
commitLatch.await();
|
||||
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
|
||||
@ -219,8 +219,8 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte
|
||||
public void testAbandoningOneTransaction() {
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
});
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext());
|
||||
transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext);
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), (LcrPosition) null, false, true, new TransactionContext());
|
||||
transactionalBuffer.abandonLongTransactions(SCN, offsetContext);
|
||||
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
|
||||
}
|
||||
|
||||
@ -230,7 +230,7 @@ public void testAbandoningTransactionHavingAnotherOne() {
|
||||
});
|
||||
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
});
|
||||
transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext);
|
||||
transactionalBuffer.abandonLongTransactions(SCN, offsetContext);
|
||||
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
|
||||
}
|
||||
|
||||
@ -248,7 +248,7 @@ public void testTransactionDump() {
|
||||
|
||||
private void commitTransaction(TransactionalBuffer.CommitCallback commitCallback) {
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), commitCallback);
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.toString(), SCN.toString(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user