DBZ-2518 Implement Scn as a domain type
This commit is contained in:
parent
bb24606188
commit
5a7ac0cfe1
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
|
||||
import io.debezium.common.annotation.Incubating;
|
||||
@ -37,7 +36,7 @@ public interface HistoryRecorder extends AutoCloseable {
|
||||
* @param csf the continuation sequence flag
|
||||
* @param redoSql the redo SQL that performed the operation
|
||||
*/
|
||||
void record(BigDecimal scn, String tableName, String segOwner, int operationCode, Timestamp changeTime,
|
||||
void record(Scn scn, String tableName, String segOwner, int operationCode, Timestamp changeTime,
|
||||
String transactionId, int csf, String redoSql);
|
||||
|
||||
/**
|
||||
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
@ -91,7 +90,7 @@ int processResult(ResultSet resultSet) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
BigDecimal scn = RowMapper.getScn(transactionalBufferMetrics, resultSet);
|
||||
Scn scn = RowMapper.getScn(transactionalBufferMetrics, resultSet);
|
||||
String tableName = RowMapper.getTableName(transactionalBufferMetrics, resultSet);
|
||||
String segOwner = RowMapper.getSegOwner(transactionalBufferMetrics, resultSet);
|
||||
int operationCode = RowMapper.getOperationCode(transactionalBufferMetrics, resultSet);
|
||||
|
@ -22,7 +22,6 @@
|
||||
import static io.debezium.connector.oracle.logminer.LogMinerHelper.setRedoLogFilesForMining;
|
||||
import static io.debezium.connector.oracle.logminer.LogMinerHelper.startLogMining;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
@ -267,7 +266,7 @@ private void abandonOldTransactionsIfExist(Connection connection) {
|
||||
// TODO computing the largest scn in the buffer is a left-over from previous incarnations, remove it.
|
||||
// TODO We don't need to keep largestScn in the buffer at all. clean it
|
||||
private void updateStartScn() {
|
||||
long nextStartScn = transactionalBuffer.getLargestScn().equals(BigDecimal.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue();
|
||||
long nextStartScn = transactionalBuffer.getLargestScn().equals(Scn.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue();
|
||||
if (nextStartScn <= startScn) {
|
||||
// When system is idle, largest SCN may stay unchanged, move it forward then
|
||||
transactionalBuffer.resetLargestScn(endScn);
|
||||
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
|
||||
import io.debezium.jdbc.JdbcConfiguration;
|
||||
@ -21,7 +20,7 @@ public void prepare(LogMinerMetrics metrics, JdbcConfiguration jdbcConfiguration
|
||||
}
|
||||
|
||||
@Override
|
||||
public void record(BigDecimal scn, String tableName, String segOwner, int operationCode, Timestamp changeTime,
|
||||
public void record(Scn scn, String tableName, String segOwner, int operationCode, Timestamp changeTime,
|
||||
String transactionId, int csf, String redoSql) {
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
@ -113,13 +112,13 @@ public static Timestamp getChangeTime(TransactionalBufferMetrics metrics, Result
|
||||
}
|
||||
}
|
||||
|
||||
public static BigDecimal getScn(TransactionalBufferMetrics metrics, ResultSet rs) {
|
||||
public static Scn getScn(TransactionalBufferMetrics metrics, ResultSet rs) {
|
||||
try {
|
||||
return rs.getBigDecimal(SCN);
|
||||
return new Scn(rs.getBigDecimal(SCN));
|
||||
}
|
||||
catch (SQLException e) {
|
||||
logError(metrics, e, "SCN");
|
||||
return new BigDecimal(-1);
|
||||
return Scn.INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
@ -150,7 +149,7 @@ public static String getTransactionId(TransactionalBufferMetrics metrics, Result
|
||||
* @return the redo SQL
|
||||
*/
|
||||
public static String getSqlRedo(TransactionalBufferMetrics metrics, ResultSet rs, boolean isDml,
|
||||
HistoryRecorder historyRecorder, BigDecimal scn, String tableName,
|
||||
HistoryRecorder historyRecorder, Scn scn, String tableName,
|
||||
String segOwner, int operationCode, Timestamp changeTime, String txId) {
|
||||
int lobLimitCounter = 9; // todo : decide on approach ( XStream chunk option) and Lob limit
|
||||
StringBuilder result = new StringBuilder(4000);
|
||||
|
@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Oracle System Change Number implementation
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class Scn implements Comparable<Scn> {
|
||||
|
||||
public static final Scn INVALID = new Scn(new BigDecimal(-1));
|
||||
public static final Scn ZERO = new Scn(BigDecimal.ZERO);
|
||||
public static final Scn ONE = new Scn(BigDecimal.ONE);
|
||||
|
||||
private BigDecimal scn;
|
||||
|
||||
public Scn(BigDecimal scn) {
|
||||
assert scn.scale() == 0;
|
||||
this.scn = scn;
|
||||
}
|
||||
|
||||
public static Scn fromLong(Long value) {
|
||||
return new Scn(new BigDecimal(value));
|
||||
}
|
||||
|
||||
public long longValue() {
|
||||
return scn.longValue();
|
||||
}
|
||||
|
||||
public Scn add(Scn value) {
|
||||
return new Scn(scn.add(value.scn));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Scn o) {
|
||||
return scn.compareTo(o.scn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Scn scn1 = (Scn) o;
|
||||
return Objects.equals(scn, scn1.scn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(scn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return scn != null ? scn.toString() : "null";
|
||||
}
|
||||
}
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
@ -57,8 +56,8 @@ public final class TransactionalBuffer {
|
||||
|
||||
// It holds the latest captured SCN.
|
||||
// This number tracks starting point for the next mining cycle.
|
||||
private BigDecimal largestScn;
|
||||
private BigDecimal lastCommittedScn;
|
||||
private Scn largestScn;
|
||||
private Scn lastCommittedScn;
|
||||
|
||||
/**
|
||||
* Constructor to create a new instance.
|
||||
@ -80,8 +79,8 @@ public final class TransactionalBuffer {
|
||||
this.taskCounter = new AtomicInteger();
|
||||
this.errorHandler = errorHandler;
|
||||
this.metrics = metrics;
|
||||
largestScn = BigDecimal.ZERO;
|
||||
lastCommittedScn = BigDecimal.ZERO;
|
||||
this.largestScn = Scn.ZERO;
|
||||
this.lastCommittedScn = Scn.ZERO;
|
||||
this.abandonedTransactionIds = new HashSet<>();
|
||||
this.rolledBackTransactionIds = new HashSet<>();
|
||||
}
|
||||
@ -89,7 +88,7 @@ public final class TransactionalBuffer {
|
||||
/**
|
||||
* @return largest last SCN in the buffer among all transactions
|
||||
*/
|
||||
BigDecimal getLargestScn() {
|
||||
Scn getLargestScn() {
|
||||
return largestScn;
|
||||
}
|
||||
|
||||
@ -105,10 +104,10 @@ Set<String> getRolledBackTransactionIds() {
|
||||
*/
|
||||
void resetLargestScn(Long value) {
|
||||
if (value != null) {
|
||||
largestScn = new BigDecimal(value);
|
||||
largestScn = Scn.fromLong(value);
|
||||
}
|
||||
else {
|
||||
largestScn = BigDecimal.ZERO;
|
||||
largestScn = Scn.ZERO;
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,7 +119,7 @@ void resetLargestScn(Long value) {
|
||||
* @param changeTime time of DML parsing completion
|
||||
* @param callback callback to execute when transaction commits
|
||||
*/
|
||||
void registerCommitCallback(String transactionId, BigDecimal scn, Instant changeTime, CommitCallback callback) {
|
||||
void registerCommitCallback(String transactionId, Scn scn, Instant changeTime, CommitCallback callback) {
|
||||
if (abandonedTransactionIds.contains(transactionId)) {
|
||||
LogMinerHelper.logWarn(metrics, "Captured DML for abandoned transaction {}, ignored", transactionId);
|
||||
return;
|
||||
@ -159,7 +158,7 @@ void registerCommitCallback(String transactionId, BigDecimal scn, Instant change
|
||||
* @param debugMessage message
|
||||
* @return true if committed transaction is in the buffer, was not processed yet and processed now
|
||||
*/
|
||||
boolean commit(String transactionId, BigDecimal scn, OracleOffsetContext offsetContext, Timestamp timestamp,
|
||||
boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, Timestamp timestamp,
|
||||
ChangeEventSource.ChangeEventSourceContext context, String debugMessage) {
|
||||
|
||||
Transaction transaction = transactions.get(transactionId);
|
||||
@ -171,7 +170,7 @@ boolean commit(String transactionId, BigDecimal scn, OracleOffsetContext offsetC
|
||||
|
||||
calculateLargestScn();
|
||||
transaction = transactions.remove(transactionId);
|
||||
BigDecimal smallestScn = calculateSmallestScn();
|
||||
Scn smallestScn = calculateSmallestScn();
|
||||
|
||||
taskCounter.incrementAndGet();
|
||||
abandonedTransactionIds.remove(transactionId);
|
||||
@ -198,7 +197,7 @@ boolean commit(String transactionId, BigDecimal scn, OracleOffsetContext offsetC
|
||||
callback.execute(timestamp, smallestScn, scn, --counter);
|
||||
}
|
||||
|
||||
lastCommittedScn = new BigDecimal(scn.longValue());
|
||||
lastCommittedScn = Scn.fromLong(scn.longValue());
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
LogMinerHelper.logError(metrics, "Thread interrupted during running", e);
|
||||
@ -262,8 +261,8 @@ boolean rollback(String transactionId, String debugMessage) {
|
||||
* @param thresholdScn the smallest SVN of any transaction to keep in the buffer. All others will be removed.
|
||||
*/
|
||||
void abandonLongTransactions(Long thresholdScn) {
|
||||
BigDecimal threshold = new BigDecimal(thresholdScn);
|
||||
BigDecimal smallestScn = calculateSmallestScn();
|
||||
Scn threshold = Scn.fromLong(thresholdScn);
|
||||
Scn smallestScn = calculateSmallestScn();
|
||||
if (smallestScn == null) {
|
||||
// no transactions in the buffer
|
||||
return;
|
||||
@ -291,23 +290,23 @@ boolean isTransactionRegistered(String txId) {
|
||||
return transactions.get(txId) != null;
|
||||
}
|
||||
|
||||
private BigDecimal calculateSmallestScn() {
|
||||
BigDecimal scn = transactions.isEmpty() ? null
|
||||
private Scn calculateSmallestScn() {
|
||||
Scn scn = transactions.isEmpty() ? null
|
||||
: transactions.values()
|
||||
.stream()
|
||||
.map(transaction -> transaction.firstScn)
|
||||
.min(BigDecimal::compareTo)
|
||||
.min(Scn::compareTo)
|
||||
.orElseThrow(() -> new DataException("Cannot calculate smallest SCN"));
|
||||
metrics.setOldestScn(scn == null ? -1 : scn.longValue());
|
||||
return scn;
|
||||
}
|
||||
|
||||
private void calculateLargestScn() {
|
||||
largestScn = transactions.isEmpty() ? BigDecimal.ZERO
|
||||
largestScn = transactions.isEmpty() ? Scn.ZERO
|
||||
: transactions.values()
|
||||
.stream()
|
||||
.map(transaction -> transaction.lastScn)
|
||||
.max(BigDecimal::compareTo)
|
||||
.max(Scn::compareTo)
|
||||
.orElseThrow(() -> new DataException("Cannot calculate largest SCN"));
|
||||
}
|
||||
|
||||
@ -356,17 +355,17 @@ public interface CommitCallback {
|
||||
* @param commitScn commit SCN
|
||||
* @param callbackNumber number of the callback in the transaction
|
||||
*/
|
||||
void execute(Timestamp timestamp, BigDecimal smallestScn, BigDecimal commitScn, int callbackNumber) throws InterruptedException;
|
||||
void execute(Timestamp timestamp, Scn smallestScn, Scn commitScn, int callbackNumber) throws InterruptedException;
|
||||
}
|
||||
|
||||
@NotThreadSafe
|
||||
private static final class Transaction {
|
||||
|
||||
private final BigDecimal firstScn;
|
||||
private BigDecimal lastScn;
|
||||
private final Scn firstScn;
|
||||
private Scn lastScn;
|
||||
private final List<CommitCallback> commitCallbacks;
|
||||
|
||||
private Transaction(BigDecimal firstScn) {
|
||||
private Transaction(Scn firstScn) {
|
||||
this.firstScn = firstScn;
|
||||
this.commitCallbacks = new ArrayList<>();
|
||||
this.lastScn = firstScn;
|
||||
|
@ -5,10 +5,10 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer.valueholder;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
|
||||
import io.debezium.connector.oracle.logminer.Scn;
|
||||
import io.debezium.data.Envelope;
|
||||
|
||||
public interface LogMinerDmlEntry {
|
||||
@ -38,7 +38,7 @@ public interface LogMinerDmlEntry {
|
||||
* The actual SCN will be assigned after commit
|
||||
* @return it's value
|
||||
*/
|
||||
BigDecimal getScn();
|
||||
Scn getScn();
|
||||
|
||||
/**
|
||||
* @return transaction ID
|
||||
@ -64,7 +64,7 @@ public interface LogMinerDmlEntry {
|
||||
* sets scn obtained from a Log Miner entry
|
||||
* @param scn it's value
|
||||
*/
|
||||
void setScn(BigDecimal scn);
|
||||
void setScn(Scn scn);
|
||||
|
||||
/**
|
||||
* Sets table name
|
||||
|
@ -5,11 +5,11 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer.valueholder;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import io.debezium.connector.oracle.logminer.Scn;
|
||||
import io.debezium.data.Envelope;
|
||||
|
||||
/**
|
||||
@ -25,7 +25,7 @@ public class LogMinerDmlEntryImpl implements LogMinerDmlEntry {
|
||||
private String objectName;
|
||||
private Timestamp sourceTime;
|
||||
private String transactionId;
|
||||
private BigDecimal scn;
|
||||
private Scn scn;
|
||||
|
||||
public LogMinerDmlEntryImpl(Envelope.Operation commandType, List<LogMinerColumnValue> newLmColumnValues, List<LogMinerColumnValue> oldLmColumnValues) {
|
||||
this.commandType = commandType;
|
||||
@ -89,12 +89,12 @@ public void setTransactionId(String id) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BigDecimal getScn() {
|
||||
public Scn getScn() {
|
||||
return scn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScn(BigDecimal scn) {
|
||||
public void setScn(Scn scn) {
|
||||
this.scn = scn;
|
||||
}
|
||||
|
||||
|
@ -94,12 +94,12 @@ public void testSeqOwner() throws SQLException {
|
||||
@Test
|
||||
public void testGetScn() throws SQLException {
|
||||
Mockito.when(rs.getBigDecimal(1)).thenReturn(new BigDecimal(1));
|
||||
BigDecimal scn = RowMapper.getScn(metrics, rs);
|
||||
assertThat(scn.equals(new BigDecimal(1))).isTrue();
|
||||
Scn scn = RowMapper.getScn(metrics, rs);
|
||||
assertThat(scn).isEqualTo(Scn.fromLong(1L));
|
||||
verify(rs).getBigDecimal(1);
|
||||
Mockito.when(rs.getBigDecimal(1)).thenThrow(SQLException.class);
|
||||
scn = RowMapper.getScn(metrics, rs);
|
||||
assertThat(scn.equals(new BigDecimal(-1))).isTrue();
|
||||
assertThat(scn).isEqualTo(Scn.INVALID);
|
||||
verify(rs, times(2)).getBigDecimal(1);
|
||||
}
|
||||
|
||||
@ -119,14 +119,14 @@ public void testGetTransactionId() throws SQLException {
|
||||
public void testSqlRedo() throws SQLException {
|
||||
Mockito.when(rs.getInt(6)).thenReturn(0);
|
||||
Mockito.when(rs.getString(2)).thenReturn("short_sql");
|
||||
String sql = RowMapper.getSqlRedo(metrics, rs, false, null, BigDecimal.ONE, "", "", 1, null, "");
|
||||
String sql = RowMapper.getSqlRedo(metrics, rs, false, null, Scn.ONE, "", "", 1, null, "");
|
||||
assertThat(sql.equals("short_sql")).isTrue();
|
||||
verify(rs).getInt(6);
|
||||
verify(rs).getString(2);
|
||||
|
||||
Mockito.when(rs.getInt(6)).thenReturn(1).thenReturn(0);
|
||||
Mockito.when(rs.getString(2)).thenReturn("long").thenReturn("_sql");
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, BigDecimal.ONE, "", "", 1, null, "");
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, Scn.ONE, "", "", 1, null, "");
|
||||
assertThat(sql.equals("long_sql")).isTrue();
|
||||
verify(rs, times(3)).getInt(6);
|
||||
verify(rs, times(3)).getString(2);
|
||||
@ -136,21 +136,21 @@ public void testSqlRedo() throws SQLException {
|
||||
Arrays.fill(chars, 'a');
|
||||
Mockito.when(rs.getString(2)).thenReturn(new String(chars));
|
||||
Mockito.when(rs.getInt(6)).thenReturn(1);
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, BigDecimal.ONE, "", "", 1, null, "");
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, Scn.ONE, "", "", 1, null, "");
|
||||
assertThat(sql.length()).isEqualTo(40_000);
|
||||
verify(rs, times(13)).getInt(6);
|
||||
verify(rs, times(13)).getString(2);
|
||||
|
||||
Mockito.when(rs.getInt(6)).thenReturn(0);
|
||||
Mockito.when(rs.getString(2)).thenReturn(null);
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, BigDecimal.ONE, "", "", 1, null, "");
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, Scn.ONE, "", "", 1, null, "");
|
||||
assertThat(sql).isNull();
|
||||
verify(rs, times(13)).getInt(6);
|
||||
verify(rs, times(14)).getString(2);
|
||||
|
||||
Mockito.when(rs.getInt(6)).thenReturn(0);
|
||||
Mockito.when(rs.getString(2)).thenThrow(SQLException.class);
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, BigDecimal.ONE, "", "", 1, null, "");
|
||||
sql = RowMapper.getSqlRedo(metrics, rs, false, null, Scn.ONE, "", "", 1, null, "");
|
||||
assertThat(sql.equals("")).isTrue();
|
||||
verify(rs, times(13)).getInt(6);
|
||||
verify(rs, times(15)).getString(2);
|
||||
|
@ -13,7 +13,6 @@
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Timestamp;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
@ -54,9 +53,9 @@ public class TransactionalBufferTest {
|
||||
private static final String SQL_ONE = "update table";
|
||||
private static final String SQL_TWO = "insert into table";
|
||||
private static final String MESSAGE = "OK";
|
||||
private static final BigDecimal SCN = BigDecimal.ONE;
|
||||
private static final BigDecimal OTHER_SCN = BigDecimal.TEN;
|
||||
private static final BigDecimal LARGEST_SCN = BigDecimal.valueOf(100L);
|
||||
private static final Scn SCN = Scn.ONE;
|
||||
private static final Scn OTHER_SCN = Scn.fromLong(10L);
|
||||
private static final Scn LARGEST_SCN = Scn.fromLong(100L);
|
||||
private static final Timestamp TIMESTAMP = new Timestamp(System.currentTimeMillis());
|
||||
private static final Configuration config = new Configuration() {
|
||||
@Override
|
||||
@ -113,7 +112,7 @@ public void testIsNotEmptyWhenTransactionIsRegistered() {
|
||||
public void testIsNotEmptyWhenTransactionIsCommitting() {
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> Thread.sleep(1000));
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
|
||||
}
|
||||
|
||||
@ -122,7 +121,7 @@ public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException
|
||||
CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown());
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
commitLatch.await();
|
||||
Thread.sleep(1000);
|
||||
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
|
||||
@ -166,14 +165,14 @@ public void testNonEmptySecondTransactionIsRolledBack() {
|
||||
@Test
|
||||
public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException {
|
||||
CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
|
||||
AtomicReference<Scn> smallestScnContainer = new AtomicReference<>();
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
smallestScnContainer.set(smallestScn);
|
||||
commitLatch.countDown();
|
||||
});
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // before commit
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
commitLatch.await();
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // after commit
|
||||
|
||||
@ -185,7 +184,7 @@ public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedExcep
|
||||
@Test
|
||||
public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException {
|
||||
CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
|
||||
AtomicReference<Scn> smallestScnContainer = new AtomicReference<>();
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
smallestScnContainer.set(smallestScn);
|
||||
commitLatch.countDown();
|
||||
@ -194,7 +193,7 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Interrupted
|
||||
});
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
commitLatch.await();
|
||||
// after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
|
||||
@ -208,14 +207,14 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
});
|
||||
CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
AtomicReference<BigDecimal> smallestScnContainer = new AtomicReference<>();
|
||||
AtomicReference<Scn> smallestScnContainer = new AtomicReference<>();
|
||||
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
|
||||
smallestScnContainer.set(smallestScn);
|
||||
commitLatch.countDown();
|
||||
});
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), OTHER_SCN.longValue(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
commitLatch.await();
|
||||
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
|
||||
// after committing OTHER_TRANSACTION_ID
|
||||
@ -235,7 +234,7 @@ public void testResetLargestScn() {
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // after commit
|
||||
|
||||
transactionalBuffer.resetLargestScn(null);
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO);
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(Scn.ZERO);
|
||||
transactionalBuffer.resetLargestScn(OTHER_SCN.longValue());
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
|
||||
}
|
||||
@ -246,7 +245,7 @@ public void testAbandoningOneTransaction() {
|
||||
});
|
||||
transactionalBuffer.abandonLongTransactions(SCN.longValue());
|
||||
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(BigDecimal.ZERO);
|
||||
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(Scn.ZERO);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -309,6 +308,6 @@ public void testCommitQueueOverflowProcessedOnCaller() throws InterruptedExcepti
|
||||
private void commitTransaction(TransactionalBuffer.CommitCallback commitCallback) {
|
||||
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), commitCallback);
|
||||
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(BigDecimal.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE);
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ public void testValueHolders() throws Exception {
|
||||
dmlEntryExpected.setTransactionId("transaction_id");
|
||||
dmlEntryExpected.setObjectName(TABLE_NAME);
|
||||
dmlEntryExpected.setObjectOwner(SCHEMA_NAME);
|
||||
dmlEntryExpected.setScn(BigDecimal.ONE);
|
||||
dmlEntryExpected.setScn(Scn.ONE);
|
||||
dmlEntryExpected.setSourceTime(new Timestamp(1000));
|
||||
|
||||
String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_small_table.sql", null, getClass(), null, null));
|
||||
@ -82,7 +82,7 @@ public void testValueHolders() throws Exception {
|
||||
|
||||
assertThat(dmlEntryParsed.equals(dmlEntryExpected)).isTrue();
|
||||
assertThat(dmlEntryExpected.getCommandType() == Envelope.Operation.CREATE).isTrue();
|
||||
assertThat(dmlEntryExpected.getScn().equals(BigDecimal.ONE)).isTrue();
|
||||
assertThat(dmlEntryExpected.getScn().equals(Scn.ONE)).isTrue();
|
||||
assertThat(dmlEntryExpected.getSourceTime().equals(new Timestamp(1000))).isTrue();
|
||||
assertThat(dmlEntryExpected.getTransactionId().equals("transaction_id")).isTrue();
|
||||
assertThat(dmlEntryExpected.getObjectOwner().equals(SCHEMA_NAME)).isTrue();
|
||||
|
Loading…
Reference in New Issue
Block a user