DBZ-3090 Commit events in same thread that processes results for LogMiner

This commit is contained in:
Chris Cranford 2021-02-12 13:50:33 -05:00 committed by Gunnar Morling
parent 570e35eaad
commit 4dbd9f3e4c
7 changed files with 171 additions and 262 deletions

View File

@ -54,7 +54,7 @@ class LogMinerQueryResultProcessor {
LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics,
TransactionalBuffer transactionalBuffer, SimpleDmlParser dmlParser,
OracleOffsetContext offsetContext, OracleDatabaseSchema schema,
EventDispatcher<TableId> dispatcher, TransactionalBufferMetrics transactionalBufferMetrics,
EventDispatcher<TableId> dispatcher,
String catalogName, Clock clock, HistoryRecorder historyRecorder) {
this.context = context;
this.metrics = metrics;
@ -63,7 +63,7 @@ class LogMinerQueryResultProcessor {
this.offsetContext = offsetContext;
this.schema = schema;
this.dispatcher = dispatcher;
this.transactionalBufferMetrics = transactionalBufferMetrics;
this.transactionalBufferMetrics = transactionalBuffer.getMetrics();
this.catalogName = catalogName;
this.clock = clock;
this.historyRecorder = historyRecorder;

View File

@ -17,7 +17,6 @@
import static io.debezium.connector.oracle.logminer.LogMinerHelper.getTimeDifference;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.instantiateFlushConnections;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.logError;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.logWarn;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.setNlsSessionParameters;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.setRedoLogFilesForMining;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.startLogMining;
@ -31,7 +30,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -79,9 +77,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private final boolean isContinuousMining;
private OracleConnectorConfig connectorConfig;
private TransactionalBufferMetrics transactionalBufferMetrics;
private LogMinerMetrics logMinerMetrics;
private TransactionalBuffer transactionalBuffer;
private long startScn;
private long endScn;
private Duration archiveLogRetention;
@ -119,126 +115,106 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
*/
@Override
public void execute(ChangeEventSourceContext context) {
try {
// Perform registration
registerTransactionalBuffer();
registerLogMinerMetrics();
try (TransactionalBuffer transactionalBuffer = new TransactionalBuffer(taskContext, errorHandler)) {
try {
// Perform registration
registerLogMinerMetrics();
try (Connection connection = jdbcConnection.connection(false)) {
long databaseTimeMs = getTimeDifference(connection).toMillis();
try (Connection connection = jdbcConnection.connection(false)) {
long databaseTimeMs = getTimeDifference(connection).toMillis();
LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs);
transactionalBufferMetrics.setTimeDifference(new AtomicLong(databaseTimeMs));
LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs);
transactionalBuffer.setDatabaseTimeDifference(databaseTimeMs);
startScn = offsetContext.getScn();
createFlushTable(connection);
startScn = offsetContext.getScn();
createFlushTable(connection);
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection, archiveLogRetention)) {
throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
}
if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection, archiveLogRetention)) {
throw new DebeziumException(
"Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot.");
}
setNlsSessionParameters(jdbcConnection);
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
setNlsSessionParameters(jdbcConnection);
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
initializeRedoLogsForMining(connection, false, archiveLogRetention);
initializeRedoLogsForMining(connection, false, archiveLogRetention);
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
try {
// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
try {
// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());
final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics,
transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, transactionalBufferMetrics,
catalogName, clock, historyRecorder);
final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics,
transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, catalogName, clock, historyRecorder);
try (PreparedStatement miningView = connection
.prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
Set<String> currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
try (PreparedStatement miningView = connection
.prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) {
Set<String> currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
Stopwatch stopwatch = Stopwatch.reusable();
while (context.isRunning()) {
endScn = getEndScn(connection, startScn, logMinerMetrics, connectorConfig.getLogMiningBatchSizeDefault());
flushLogWriter(connection, jdbcConfiguration, isRac, racHosts);
Stopwatch stopwatch = Stopwatch.reusable();
while (context.isRunning()) {
endScn = getEndScn(connection, startScn, logMinerMetrics, connectorConfig.getLogMiningBatchSizeDefault());
flushLogWriter(connection, jdbcConfiguration, isRac, racHosts);
pauseBetweenMiningSessions();
pauseBetweenMiningSessions();
Set<String> possibleNewCurrentLogFile = getCurrentRedoLogFiles(connection, logMinerMetrics);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("Redo log switch detected, from {} to {}", currentRedoLogFiles, possibleNewCurrentLogFile);
Set<String> possibleNewCurrentLogFile = getCurrentRedoLogFiles(connection, logMinerMetrics);
if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) {
LOGGER.debug("Redo log switch detected, from {} to {}", currentRedoLogFiles, possibleNewCurrentLogFile);
// This is the way to mitigate PGA leaks.
// With one mining session, it grows and maybe there is another way to flush PGA.
// At this point we use a new mining session
LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining);
endMining(connection);
// This is the way to mitigate PGA leaks.
// With one mining session, it grows and maybe there is another way to flush PGA.
// At this point we use a new mining session
LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining);
endMining(connection);
initializeRedoLogsForMining(connection, true, archiveLogRetention);
initializeRedoLogsForMining(connection, true, archiveLogRetention);
abandonOldTransactionsIfExist(connection);
currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
}
abandonOldTransactionsIfExist(connection, transactionalBuffer);
currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics);
}
startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining);
startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining);
stopwatch.start();
miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE);
miningView.setLong(1, startScn);
miningView.setLong(2, endScn);
try (ResultSet rs = miningView.executeQuery()) {
Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
logMinerMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
processor.processResult(rs);
stopwatch.start();
miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE);
miningView.setLong(1, startScn);
miningView.setLong(2, endScn);
try (ResultSet rs = miningView.executeQuery()) {
Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
logMinerMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
processor.processResult(rs);
updateStartScn();
updateStartScn(transactionalBuffer);
if (transactionalBuffer.isEmpty()) {
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
offsetContext.setScn(startScn);
transactionalBuffer.resetLargestScn(null);
if (transactionalBuffer.isEmpty()) {
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
offsetContext.setScn(startScn);
transactionalBuffer.resetLargestScn(null);
}
}
}
}
}
}
finally {
historyRecorder.close();
finally {
historyRecorder.close();
}
}
}
}
catch (Throwable t) {
logError(transactionalBufferMetrics, "Mining session stopped due to the {}", t);
errorHandler.setProducerThrowable(t);
}
finally {
LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
LOGGER.info("Transactional buffer metrics dump: {}", transactionalBufferMetrics.toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString());
catch (Throwable t) {
logError(transactionalBuffer.getMetrics(), "Mining session stopped due to the {}", t);
errorHandler.setProducerThrowable(t);
}
finally {
LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
LOGGER.info("Transactional buffer metrics dump: {}", transactionalBuffer.getMetrics().toString());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString());
// Perform unregistration
unregisterLogMinerMetrics();
unregisterTransactionalBuffer();
}
}
private void registerTransactionalBuffer() {
// Create transactional buffer metrics
transactionalBufferMetrics = new TransactionalBufferMetrics(taskContext);
transactionalBufferMetrics.register(LOGGER);
// Create transactional buffer
transactionalBuffer = new TransactionalBuffer(connectorConfig.getLogicalName(), errorHandler,
transactionalBufferMetrics, connectorConfig.getMaxQueueSize());
}
private void unregisterTransactionalBuffer() {
if (transactionalBuffer != null) {
transactionalBuffer.close();
}
if (transactionalBufferMetrics != null) {
transactionalBufferMetrics.unregister(LOGGER);
// Perform unregistration
unregisterLogMinerMetrics();
}
}
}
@ -256,19 +232,18 @@ private void unregisterLogMinerMetrics() {
}
}
private void abandonOldTransactionsIfExist(Connection connection) {
private void abandonOldTransactionsIfExist(Connection connection, TransactionalBuffer transactionalBuffer) {
Optional<Long> lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetContext.getScn(), connectorConfig.getLogMiningTransactionRetention());
lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
logWarn(transactionalBufferMetrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn());
transactionalBuffer.abandonLongTransactions(thresholdScn);
transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext);
offsetContext.setScn(thresholdScn);
updateStartScn();
updateStartScn(transactionalBuffer);
});
}
// TODO computing the largest scn in the buffer is a left-over from previous incarnations, remove it.
// TODO We don't need to keep largestScn in the buffer at all. clean it
private void updateStartScn() {
private void updateStartScn(TransactionalBuffer transactionalBuffer) {
long nextStartScn = transactionalBuffer.getLargestScn().equals(Scn.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue();
if (nextStartScn <= startScn) {
LOGGER.trace("Resetting largest SCN in transaction buffer to {}, nextStartScn={}, startScn={}", endScn, nextStartScn, startScn);

View File

@ -15,45 +15,35 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Threads;
/**
* Buffer that stores transactions and related callbacks that will be executed when a transaction commits or discarded
* when a transaction has been rolled back.
*
* @author Andrey Pustovetov
* <p>
* Transactional buffer is designed to register callbacks, to execute them when transaction commits and to clear them
* when transaction rollbacks.
*/
@NotThreadSafe
public final class TransactionalBuffer {
public final class TransactionalBuffer implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalBuffer.class);
private final Map<String, Transaction> transactions;
private final ExecutorService executor;
private final AtomicInteger taskCounter;
private final ErrorHandler errorHandler;
private final Supplier<Integer> commitQueueCapacity;
private TransactionalBufferMetrics metrics;
private final Set<String> abandonedTransactionIds;
private final Set<String> rolledBackTransactionIds;
private final TransactionalBufferMetrics metrics;
// It holds the latest captured SCN.
// This number tracks starting point for the next mining cycle.
@ -63,27 +53,27 @@ public final class TransactionalBuffer {
/**
* Constructor to create a new instance.
*
* @param logicalName logical name
* @param errorHandler logError handler
* @param metrics metrics MBean
* @param inCommitQueueCapacity commit queue capacity. On overflow, caller runs task
* @param taskContext the task context
* @param errorHandler the connector error handler
*/
TransactionalBuffer(String logicalName, ErrorHandler errorHandler, TransactionalBufferMetrics metrics, int inCommitQueueCapacity) {
TransactionalBuffer(OracleTaskContext taskContext, ErrorHandler errorHandler) {
this.transactions = new HashMap<>();
final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(inCommitQueueCapacity);
executor = new ThreadPoolExecutor(1, 1,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
workQueue,
Threads.threadFactory(OracleConnector.class, logicalName, "transactional-buffer", true, false),
new ThreadPoolExecutor.CallerRunsPolicy());
commitQueueCapacity = workQueue::remainingCapacity;
this.taskCounter = new AtomicInteger();
this.errorHandler = errorHandler;
this.metrics = metrics;
this.largestScn = Scn.ZERO;
this.lastCommittedScn = Scn.ZERO;
this.abandonedTransactionIds = new HashSet<>();
this.rolledBackTransactionIds = new HashSet<>();
// create metrics and register them
this.metrics = new TransactionalBufferMetrics(taskContext);
this.metrics.register(LOGGER);
}
/**
* @return the transactional buffer's metrics
*/
TransactionalBufferMetrics getMetrics() {
return metrics;
}
/**
@ -148,8 +138,8 @@ void registerCommitCallback(String transactionId, Scn scn, Instant changeTime, C
}
/**
* If the commit executor queue is full, back-pressure will be applied by letting execution of the callback
* be performed by the calling thread.
* Commits a transaction by looking up the transaction in the buffer and if exists, all registered callbacks
* will be executed in chronological order, emitting events for each followed by a transaction commit event.
*
* @param transactionId transaction identifier
* @param scn SCN of the commit.
@ -174,7 +164,6 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext,
transaction = transactions.remove(transactionId);
Scn smallestScn = calculateSmallestScn();
taskCounter.incrementAndGet();
abandonedTransactionIds.remove(transactionId);
// On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed.
@ -189,45 +178,45 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext,
List<CommitCallback> commitCallbacks = transaction.commitCallbacks;
LOGGER.trace("COMMIT, {}, smallest SCN: {}, largest SCN {}", debugMessage, smallestScn, largestScn);
executor.execute(() -> {
try {
int counter = commitCallbacks.size();
for (CommitCallback callback : commitCallbacks) {
if (!context.isRunning()) {
return;
}
callback.execute(timestamp, smallestScn, scn, --counter);
}
lastCommittedScn = Scn.fromLong(scn.longValue());
if (!commitCallbacks.isEmpty()) {
dispatcher.dispatchTransactionCommittedEvent(offsetContext);
}
}
catch (InterruptedException e) {
LogMinerHelper.logError(metrics, "Thread interrupted during running", e);
Thread.currentThread().interrupt();
}
catch (Exception e) {
errorHandler.setProducerThrowable(e);
}
finally {
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(transactions.size());
metrics.incrementCommittedDmlCounter(commitCallbacks.size());
metrics.setCommittedScn(scn.longValue());
metrics.setOffsetScn(offsetContext.getScn());
metrics.setCommitQueueCapacity(commitQueueCapacity.get());
metrics.setLastCommitDuration(Duration.between(start, Instant.now()).toMillis());
taskCounter.decrementAndGet();
}
});
metrics.setCommitQueueCapacity(commitQueueCapacity.get());
commit(context, offsetContext, start, commitCallbacks, timestamp, smallestScn, scn, dispatcher);
return true;
}
private void commit(ChangeEventSource.ChangeEventSourceContext context, OracleOffsetContext offsetContext, Instant start,
List<CommitCallback> commitCallbacks, Timestamp timestamp, Scn smallestScn, Scn scn, EventDispatcher<?> dispatcher) {
try {
int counter = commitCallbacks.size();
for (CommitCallback callback : commitCallbacks) {
if (!context.isRunning()) {
return;
}
callback.execute(timestamp, smallestScn, scn, --counter);
}
lastCommittedScn = Scn.fromLong(scn.longValue());
if (!commitCallbacks.isEmpty()) {
dispatcher.dispatchTransactionCommittedEvent(offsetContext);
}
}
catch (InterruptedException e) {
LogMinerHelper.logError(metrics, "Thread interrupted during running", e);
Thread.currentThread().interrupt();
}
catch (Exception e) {
errorHandler.setProducerThrowable(e);
}
finally {
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(transactions.size());
metrics.incrementCommittedDmlCounter(commitCallbacks.size());
metrics.setCommittedScn(scn.longValue());
metrics.setOffsetScn(offsetContext.getScn());
metrics.setLastCommitDuration(Duration.between(start, Instant.now()).toMillis());
}
}
/**
* Clears registered callbacks for given transaction identifier.
*
@ -265,8 +254,10 @@ boolean rollback(String transactionId, String debugMessage) {
* In case of an abandonment, all DMLs/Commits/Rollbacks for this transaction will be ignored
*
* @param thresholdScn the smallest SVN of any transaction to keep in the buffer. All others will be removed.
* @param offsetContext the offset context
*/
void abandonLongTransactions(Long thresholdScn) {
void abandonLongTransactions(Long thresholdScn, OracleOffsetContext offsetContext) {
LogMinerHelper.logWarn(metrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn());
Scn threshold = Scn.fromLong(thresholdScn);
Scn smallestScn = calculateSmallestScn();
if (smallestScn == null) {
@ -322,7 +313,16 @@ private void calculateLargestScn() {
* @return {@code true} if buffer is empty, otherwise {@code false}
*/
boolean isEmpty() {
return transactions.isEmpty() && taskCounter.get() == 0;
return transactions.isEmpty();
}
/**
* Set the database time difference.
*
* @param difference the time difference in milliseconds
*/
void setDatabaseTimeDifference(long difference) {
metrics.setTimeDifference(new AtomicLong(difference));
}
@Override
@ -332,19 +332,13 @@ public String toString() {
return result.toString();
}
/**
* Closes buffer.
*/
void close() {
@Override
public void close() {
transactions.clear();
executor.shutdown();
try {
if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}
}
catch (InterruptedException e) {
LogMinerHelper.logError(metrics, "Thread interrupted during shutdown", e);
if (this.metrics != null) {
// if metrics registered, unregister them
this.metrics.unregister(LOGGER);
}
}

View File

@ -35,7 +35,6 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional
private final AtomicLong committedDmlCounter = new AtomicLong();
private final AtomicLong lastCommitDuration = new AtomicLong();
private final AtomicLong maxCommitDuration = new AtomicLong();
private final AtomicInteger commitQueueCapacity = new AtomicInteger();
private final AtomicReference<Duration> lagFromTheSource = new AtomicReference<>();
private final AtomicReference<Duration> maxLagFromTheSource = new AtomicReference<>();
private final AtomicReference<Duration> minLagFromTheSource = new AtomicReference<>();
@ -235,15 +234,6 @@ public int getScnFreezeCounter() {
return scnFreezeCounter.get();
}
@Override
public int getCommitQueueCapacity() {
return commitQueueCapacity.get();
}
void setCommitQueueCapacity(int commitQueueCapacity) {
this.commitQueueCapacity.set(commitQueueCapacity);
}
public Long getLastCommitDuration() {
return lastCommitDuration.get();
}
@ -267,7 +257,6 @@ public void reset() {
errorCounter.set(0);
warningCounter.set(0);
scnFreezeCounter.set(0);
commitQueueCapacity.set(0);
}
@Override
@ -290,7 +279,6 @@ public String toString() {
", errorCounter=" + errorCounter.get() +
", warningCounter=" + warningCounter.get() +
", scnFreezeCounter=" + scnFreezeCounter.get() +
", commitQueueCapacity=" + commitQueueCapacity.get() +
'}';
}
}

View File

@ -103,13 +103,6 @@ public interface TransactionalBufferMetricsMXBean {
*/
Set<String> getRolledBackTransactionIds();
/**
* Gets commit queue capacity. As the queue fills up, this reduces to zero
*
* @return the commit queue capacity
*/
int getCommitQueueCapacity();
/**
* Reset metrics
*/

View File

@ -148,9 +148,5 @@ public void testOtherMetrics() {
metrics.setOffsetScn(10L);
assertThat(metrics.getOldestScn() == 10).isTrue();
metrics.setCommitQueueCapacity(1000);
assertThat(metrics.getCommitQueueCapacity()).isEqualTo(1000);
}
}

View File

@ -7,9 +7,6 @@
import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE;
import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE;
import static junit.framework.TestCase.assertNotSame;
import static junit.framework.TestCase.assertSame;
import static junit.framework.TestCase.assertTrue;
import static org.fest.assertions.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@ -19,7 +16,6 @@
import java.time.temporal.ChronoUnit;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
@ -27,12 +23,14 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
@ -72,6 +70,7 @@ public String getString(String key) {
private static final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
private static OracleOffsetContext offsetContext;
private OracleTaskContext taskContext;
private ErrorHandler errorHandler;
private TransactionalBuffer transactionalBuffer;
private TransactionalBufferMetrics metrics;
@ -88,10 +87,15 @@ public void before() {
.maxQueueSize(DEFAULT_MAX_QUEUE_SIZE)
.build();
errorHandler = new ErrorHandler(OracleConnector.class, SERVER_NAME, queue);
metrics = mock(TransactionalBufferMetrics.class);
taskContext = mock(OracleTaskContext.class);
Mockito.when(taskContext.getConnectorName()).thenReturn("connector name");
Mockito.when(taskContext.getConnectorType()).thenReturn("connector type");
dispatcher = mock(EventDispatcher.class);
transactionalBuffer = new TransactionalBuffer(SERVER_NAME, errorHandler, metrics,
DEFAULT_MAX_QUEUE_SIZE);
transactionalBuffer = new TransactionalBuffer(taskContext, errorHandler);
metrics = transactionalBuffer.getMetrics();
}
@After
@ -111,14 +115,6 @@ public void testIsNotEmptyWhenTransactionIsRegistered() {
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
}
@Test
public void testIsNotEmptyWhenTransactionIsCommitting() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> Thread.sleep(1000));
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext());
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
}
@Test
public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1);
@ -246,7 +242,8 @@ public void testResetLargestScn() {
public void testAbandoningOneTransaction() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
});
transactionalBuffer.abandonLongTransactions(SCN.longValue());
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext());
transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext);
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(Scn.ZERO);
}
@ -257,7 +254,7 @@ public void testAbandoningTransactionHavingAnotherOne() {
});
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
});
transactionalBuffer.abandonLongTransactions(SCN.longValue());
transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext);
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
}
@ -274,40 +271,6 @@ public void testTransactionDump() {
assertThat(transactionalBuffer.toString()).contains(String.valueOf(OTHER_SCN));
}
@Test
public void testCommitQueueOverflowProcessedOnCaller() throws InterruptedException {
Thread mainThread = Thread.currentThread();
int commitQueueCapacity = 10;
transactionalBuffer = new TransactionalBuffer(SERVER_NAME, errorHandler, metrics,
commitQueueCapacity);
int transactionToCommitCount = commitQueueCapacity + 1;
CountDownLatch countDownLatch = new CountDownLatch(transactionToCommitCount + 1);
for (int i = 0; i <= commitQueueCapacity; i++) {
commitTransaction((timestamp, smallestScn, commitScn, counter) -> {
assertNotSame(mainThread, Thread.currentThread());
TimeUnit.MILLISECONDS.sleep(100);
countDownLatch.countDown();
});
}
// Commit one more over the capacity. This should process in the test thread, applying backpressure
// to the caller
commitTransaction((timestamp, smallestScn, commitScn, counter) -> {
assertSame(mainThread, Thread.currentThread());
countDownLatch.countDown();
});
TimeUnit.SECONDS.sleep(2);
// Commit one more over the capacity. After delay, the executor had time to recover and empty its queue
// This should go back to processing in the executor thread
commitTransaction((timestamp, smallestScn, commitScn, counter) -> {
assertNotSame(mainThread, Thread.currentThread());
countDownLatch.countDown();
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
private void commitTransaction(TransactionalBuffer.CommitCallback commitCallback) {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), commitCallback);
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());