From 4dbd9f3e4cac2190d95a052e9aa0fe3abb3ba919 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Fri, 12 Feb 2021 13:50:33 -0500 Subject: [PATCH] DBZ-3090 Commit events in same thread that processes results for LogMiner --- .../LogMinerQueryResultProcessor.java | 4 +- .../LogMinerStreamingChangeEventSource.java | 181 ++++++++---------- .../oracle/logminer/TransactionalBuffer.java | 160 ++++++++-------- .../logminer/TransactionalBufferMetrics.java | 12 -- .../TransactionalBufferMetricsMXBean.java | 7 - .../TransactionalBufferMetricsTest.java | 4 - .../logminer/TransactionalBufferTest.java | 65 ++----- 7 files changed, 171 insertions(+), 262 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java index 2a7758f3e..3d2b540fc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryResultProcessor.java @@ -54,7 +54,7 @@ class LogMinerQueryResultProcessor { LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics, TransactionalBuffer transactionalBuffer, SimpleDmlParser dmlParser, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, - EventDispatcher dispatcher, TransactionalBufferMetrics transactionalBufferMetrics, + EventDispatcher dispatcher, String catalogName, Clock clock, HistoryRecorder historyRecorder) { this.context = context; this.metrics = metrics; @@ -63,7 +63,7 @@ class LogMinerQueryResultProcessor { this.offsetContext = offsetContext; this.schema = schema; this.dispatcher = dispatcher; - this.transactionalBufferMetrics = transactionalBufferMetrics; + this.transactionalBufferMetrics = transactionalBuffer.getMetrics(); this.catalogName = catalogName; this.clock = clock; this.historyRecorder = historyRecorder; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index c6b788d01..8088d3bd6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -17,7 +17,6 @@ import static io.debezium.connector.oracle.logminer.LogMinerHelper.getTimeDifference; import static io.debezium.connector.oracle.logminer.LogMinerHelper.instantiateFlushConnections; import static io.debezium.connector.oracle.logminer.LogMinerHelper.logError; -import static io.debezium.connector.oracle.logminer.LogMinerHelper.logWarn; import static io.debezium.connector.oracle.logminer.LogMinerHelper.setNlsSessionParameters; import static io.debezium.connector.oracle.logminer.LogMinerHelper.setRedoLogFilesForMining; import static io.debezium.connector.oracle.logminer.LogMinerHelper.startLogMining; @@ -31,7 +30,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -79,9 +77,7 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS private final boolean isContinuousMining; private OracleConnectorConfig connectorConfig; - private TransactionalBufferMetrics transactionalBufferMetrics; private LogMinerMetrics logMinerMetrics; - private TransactionalBuffer transactionalBuffer; private long startScn; private long endScn; private Duration archiveLogRetention; @@ -119,126 +115,106 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, */ @Override public void execute(ChangeEventSourceContext context) { - try { - // Perform registration - registerTransactionalBuffer(); - registerLogMinerMetrics(); + try (TransactionalBuffer transactionalBuffer = new TransactionalBuffer(taskContext, errorHandler)) { + try { + // Perform registration + registerLogMinerMetrics(); - try (Connection connection = jdbcConnection.connection(false)) { - long databaseTimeMs = getTimeDifference(connection).toMillis(); + try (Connection connection = jdbcConnection.connection(false)) { + long databaseTimeMs = getTimeDifference(connection).toMillis(); - LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs); - transactionalBufferMetrics.setTimeDifference(new AtomicLong(databaseTimeMs)); + LOGGER.trace("Current time {} ms, database difference {} ms", System.currentTimeMillis(), databaseTimeMs); + transactionalBuffer.setDatabaseTimeDifference(databaseTimeMs); - startScn = offsetContext.getScn(); - createFlushTable(connection); + startScn = offsetContext.getScn(); + createFlushTable(connection); - if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection, archiveLogRetention)) { - throw new DebeziumException( - "Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot."); - } + if (!isContinuousMining && startScn < getFirstOnlineLogScn(connection, archiveLogRetention)) { + throw new DebeziumException( + "Online REDO LOG files or archive log files do not contain the offset scn " + startScn + ". Please perform a new snapshot."); + } - setNlsSessionParameters(jdbcConnection); - checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema); + setNlsSessionParameters(jdbcConnection); + checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema); - initializeRedoLogsForMining(connection, false, archiveLogRetention); + initializeRedoLogsForMining(connection, false, archiveLogRetention); - HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder(); - try { - // todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration? - historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours()); + HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder(); + try { + // todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration? + historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours()); - final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics, - transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, transactionalBufferMetrics, - catalogName, clock, historyRecorder); + final LogMinerQueryResultProcessor processor = new LogMinerQueryResultProcessor(context, logMinerMetrics, + transactionalBuffer, dmlParser, offsetContext, schema, dispatcher, catalogName, clock, historyRecorder); - try (PreparedStatement miningView = connection - .prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) { - Set currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics); + try (PreparedStatement miningView = connection + .prepareStatement(SqlUtils.logMinerContentsQuery(connectorConfig.getSchemaName(), jdbcConnection.username(), schema))) { + Set currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics); - Stopwatch stopwatch = Stopwatch.reusable(); - while (context.isRunning()) { - endScn = getEndScn(connection, startScn, logMinerMetrics, connectorConfig.getLogMiningBatchSizeDefault()); - flushLogWriter(connection, jdbcConfiguration, isRac, racHosts); + Stopwatch stopwatch = Stopwatch.reusable(); + while (context.isRunning()) { + endScn = getEndScn(connection, startScn, logMinerMetrics, connectorConfig.getLogMiningBatchSizeDefault()); + flushLogWriter(connection, jdbcConfiguration, isRac, racHosts); - pauseBetweenMiningSessions(); + pauseBetweenMiningSessions(); - Set possibleNewCurrentLogFile = getCurrentRedoLogFiles(connection, logMinerMetrics); - if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) { - LOGGER.debug("Redo log switch detected, from {} to {}", currentRedoLogFiles, possibleNewCurrentLogFile); + Set possibleNewCurrentLogFile = getCurrentRedoLogFiles(connection, logMinerMetrics); + if (!currentRedoLogFiles.equals(possibleNewCurrentLogFile)) { + LOGGER.debug("Redo log switch detected, from {} to {}", currentRedoLogFiles, possibleNewCurrentLogFile); - // This is the way to mitigate PGA leaks. - // With one mining session, it grows and maybe there is another way to flush PGA. - // At this point we use a new mining session - LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}", - startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining); - endMining(connection); + // This is the way to mitigate PGA leaks. + // With one mining session, it grows and maybe there is another way to flush PGA. + // At this point we use a new mining session + LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}", + startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining); + endMining(connection); - initializeRedoLogsForMining(connection, true, archiveLogRetention); + initializeRedoLogsForMining(connection, true, archiveLogRetention); - abandonOldTransactionsIfExist(connection); - currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics); - } + abandonOldTransactionsIfExist(connection, transactionalBuffer); + currentRedoLogFiles = getCurrentRedoLogFiles(connection, logMinerMetrics); + } - startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining); + startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining); - stopwatch.start(); - miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE); - miningView.setLong(1, startScn); - miningView.setLong(2, endScn); - try (ResultSet rs = miningView.executeQuery()) { - Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal(); - logMinerMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing); - processor.processResult(rs); + stopwatch.start(); + miningView.setFetchSize(LOG_MINING_VIEW_FETCH_SIZE); + miningView.setLong(1, startScn); + miningView.setLong(2, endScn); + try (ResultSet rs = miningView.executeQuery()) { + Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal(); + logMinerMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing); + processor.processResult(rs); - updateStartScn(); + updateStartScn(transactionalBuffer); - if (transactionalBuffer.isEmpty()) { - LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn); - offsetContext.setScn(startScn); - transactionalBuffer.resetLargestScn(null); + if (transactionalBuffer.isEmpty()) { + LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn); + offsetContext.setScn(startScn); + transactionalBuffer.resetLargestScn(null); + } } } } } - } - finally { - historyRecorder.close(); + finally { + historyRecorder.close(); + } } } - } - catch (Throwable t) { - logError(transactionalBufferMetrics, "Mining session stopped due to the {}", t); - errorHandler.setProducerThrowable(t); - } - finally { - LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn()); - LOGGER.info("Transactional buffer metrics dump: {}", transactionalBufferMetrics.toString()); - LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString()); - LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString()); + catch (Throwable t) { + logError(transactionalBuffer.getMetrics(), "Mining session stopped due to the {}", t); + errorHandler.setProducerThrowable(t); + } + finally { + LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn()); + LOGGER.info("Transactional buffer metrics dump: {}", transactionalBuffer.getMetrics().toString()); + LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString()); + LOGGER.info("LogMiner metrics dump: {}", logMinerMetrics.toString()); - // Perform unregistration - unregisterLogMinerMetrics(); - unregisterTransactionalBuffer(); - } - } - - private void registerTransactionalBuffer() { - // Create transactional buffer metrics - transactionalBufferMetrics = new TransactionalBufferMetrics(taskContext); - transactionalBufferMetrics.register(LOGGER); - - // Create transactional buffer - transactionalBuffer = new TransactionalBuffer(connectorConfig.getLogicalName(), errorHandler, - transactionalBufferMetrics, connectorConfig.getMaxQueueSize()); - } - - private void unregisterTransactionalBuffer() { - if (transactionalBuffer != null) { - transactionalBuffer.close(); - } - if (transactionalBufferMetrics != null) { - transactionalBufferMetrics.unregister(LOGGER); + // Perform unregistration + unregisterLogMinerMetrics(); + } } } @@ -256,19 +232,18 @@ private void unregisterLogMinerMetrics() { } } - private void abandonOldTransactionsIfExist(Connection connection) { + private void abandonOldTransactionsIfExist(Connection connection, TransactionalBuffer transactionalBuffer) { Optional lastScnToAbandonTransactions = getLastScnToAbandon(connection, offsetContext.getScn(), connectorConfig.getLogMiningTransactionRetention()); lastScnToAbandonTransactions.ifPresent(thresholdScn -> { - logWarn(transactionalBufferMetrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn()); - transactionalBuffer.abandonLongTransactions(thresholdScn); + transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext); offsetContext.setScn(thresholdScn); - updateStartScn(); + updateStartScn(transactionalBuffer); }); } // TODO computing the largest scn in the buffer is a left-over from previous incarnations, remove it. // TODO We don't need to keep largestScn in the buffer at all. clean it - private void updateStartScn() { + private void updateStartScn(TransactionalBuffer transactionalBuffer) { long nextStartScn = transactionalBuffer.getLargestScn().equals(Scn.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue(); if (nextStartScn <= startScn) { LOGGER.trace("Resetting largest SCN in transaction buffer to {}, nextStartScn={}, startScn={}", endScn, nextStartScn, startScn); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index cf2c13af2..8f9d7328a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -15,45 +15,35 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; +import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.connect.errors.DataException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.annotation.NotThreadSafe; -import io.debezium.connector.oracle.OracleConnector; import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OracleTaskContext; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource; -import io.debezium.util.Threads; /** + * Buffer that stores transactions and related callbacks that will be executed when a transaction commits or discarded + * when a transaction has been rolled back. + * * @author Andrey Pustovetov - *

- * Transactional buffer is designed to register callbacks, to execute them when transaction commits and to clear them - * when transaction rollbacks. */ @NotThreadSafe -public final class TransactionalBuffer { +public final class TransactionalBuffer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalBuffer.class); private final Map transactions; - private final ExecutorService executor; - private final AtomicInteger taskCounter; private final ErrorHandler errorHandler; - private final Supplier commitQueueCapacity; - private TransactionalBufferMetrics metrics; private final Set abandonedTransactionIds; private final Set rolledBackTransactionIds; + private final TransactionalBufferMetrics metrics; // It holds the latest captured SCN. // This number tracks starting point for the next mining cycle. @@ -63,27 +53,27 @@ public final class TransactionalBuffer { /** * Constructor to create a new instance. * - * @param logicalName logical name - * @param errorHandler logError handler - * @param metrics metrics MBean - * @param inCommitQueueCapacity commit queue capacity. On overflow, caller runs task + * @param taskContext the task context + * @param errorHandler the connector error handler */ - TransactionalBuffer(String logicalName, ErrorHandler errorHandler, TransactionalBufferMetrics metrics, int inCommitQueueCapacity) { + TransactionalBuffer(OracleTaskContext taskContext, ErrorHandler errorHandler) { this.transactions = new HashMap<>(); - final BlockingQueue workQueue = new ArrayBlockingQueue<>(inCommitQueueCapacity); - executor = new ThreadPoolExecutor(1, 1, - Integer.MAX_VALUE, TimeUnit.MILLISECONDS, - workQueue, - Threads.threadFactory(OracleConnector.class, logicalName, "transactional-buffer", true, false), - new ThreadPoolExecutor.CallerRunsPolicy()); - commitQueueCapacity = workQueue::remainingCapacity; - this.taskCounter = new AtomicInteger(); this.errorHandler = errorHandler; - this.metrics = metrics; this.largestScn = Scn.ZERO; this.lastCommittedScn = Scn.ZERO; this.abandonedTransactionIds = new HashSet<>(); this.rolledBackTransactionIds = new HashSet<>(); + + // create metrics and register them + this.metrics = new TransactionalBufferMetrics(taskContext); + this.metrics.register(LOGGER); + } + + /** + * @return the transactional buffer's metrics + */ + TransactionalBufferMetrics getMetrics() { + return metrics; } /** @@ -148,8 +138,8 @@ void registerCommitCallback(String transactionId, Scn scn, Instant changeTime, C } /** - * If the commit executor queue is full, back-pressure will be applied by letting execution of the callback - * be performed by the calling thread. + * Commits a transaction by looking up the transaction in the buffer and if exists, all registered callbacks + * will be executed in chronological order, emitting events for each followed by a transaction commit event. * * @param transactionId transaction identifier * @param scn SCN of the commit. @@ -174,7 +164,6 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, transaction = transactions.remove(transactionId); Scn smallestScn = calculateSmallestScn(); - taskCounter.incrementAndGet(); abandonedTransactionIds.remove(transactionId); // On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed. @@ -189,45 +178,45 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, List commitCallbacks = transaction.commitCallbacks; LOGGER.trace("COMMIT, {}, smallest SCN: {}, largest SCN {}", debugMessage, smallestScn, largestScn); - executor.execute(() -> { - try { - int counter = commitCallbacks.size(); - for (CommitCallback callback : commitCallbacks) { - if (!context.isRunning()) { - return; - } - callback.execute(timestamp, smallestScn, scn, --counter); - } - - lastCommittedScn = Scn.fromLong(scn.longValue()); - - if (!commitCallbacks.isEmpty()) { - dispatcher.dispatchTransactionCommittedEvent(offsetContext); - } - } - catch (InterruptedException e) { - LogMinerHelper.logError(metrics, "Thread interrupted during running", e); - Thread.currentThread().interrupt(); - } - catch (Exception e) { - errorHandler.setProducerThrowable(e); - } - finally { - metrics.incrementCommittedTransactions(); - metrics.setActiveTransactions(transactions.size()); - metrics.incrementCommittedDmlCounter(commitCallbacks.size()); - metrics.setCommittedScn(scn.longValue()); - metrics.setOffsetScn(offsetContext.getScn()); - metrics.setCommitQueueCapacity(commitQueueCapacity.get()); - metrics.setLastCommitDuration(Duration.between(start, Instant.now()).toMillis()); - taskCounter.decrementAndGet(); - } - }); - metrics.setCommitQueueCapacity(commitQueueCapacity.get()); + commit(context, offsetContext, start, commitCallbacks, timestamp, smallestScn, scn, dispatcher); return true; } + private void commit(ChangeEventSource.ChangeEventSourceContext context, OracleOffsetContext offsetContext, Instant start, + List commitCallbacks, Timestamp timestamp, Scn smallestScn, Scn scn, EventDispatcher dispatcher) { + try { + int counter = commitCallbacks.size(); + for (CommitCallback callback : commitCallbacks) { + if (!context.isRunning()) { + return; + } + callback.execute(timestamp, smallestScn, scn, --counter); + } + + lastCommittedScn = Scn.fromLong(scn.longValue()); + + if (!commitCallbacks.isEmpty()) { + dispatcher.dispatchTransactionCommittedEvent(offsetContext); + } + } + catch (InterruptedException e) { + LogMinerHelper.logError(metrics, "Thread interrupted during running", e); + Thread.currentThread().interrupt(); + } + catch (Exception e) { + errorHandler.setProducerThrowable(e); + } + finally { + metrics.incrementCommittedTransactions(); + metrics.setActiveTransactions(transactions.size()); + metrics.incrementCommittedDmlCounter(commitCallbacks.size()); + metrics.setCommittedScn(scn.longValue()); + metrics.setOffsetScn(offsetContext.getScn()); + metrics.setLastCommitDuration(Duration.between(start, Instant.now()).toMillis()); + } + } + /** * Clears registered callbacks for given transaction identifier. * @@ -265,8 +254,10 @@ boolean rollback(String transactionId, String debugMessage) { * In case of an abandonment, all DMLs/Commits/Rollbacks for this transaction will be ignored * * @param thresholdScn the smallest SVN of any transaction to keep in the buffer. All others will be removed. + * @param offsetContext the offset context */ - void abandonLongTransactions(Long thresholdScn) { + void abandonLongTransactions(Long thresholdScn, OracleOffsetContext offsetContext) { + LogMinerHelper.logWarn(metrics, "All transactions with first SCN <= {} will be abandoned, offset: {}", thresholdScn, offsetContext.getScn()); Scn threshold = Scn.fromLong(thresholdScn); Scn smallestScn = calculateSmallestScn(); if (smallestScn == null) { @@ -322,7 +313,16 @@ private void calculateLargestScn() { * @return {@code true} if buffer is empty, otherwise {@code false} */ boolean isEmpty() { - return transactions.isEmpty() && taskCounter.get() == 0; + return transactions.isEmpty(); + } + + /** + * Set the database time difference. + * + * @param difference the time difference in milliseconds + */ + void setDatabaseTimeDifference(long difference) { + metrics.setTimeDifference(new AtomicLong(difference)); } @Override @@ -332,19 +332,13 @@ public String toString() { return result.toString(); } - /** - * Closes buffer. - */ - void close() { + @Override + public void close() { transactions.clear(); - executor.shutdown(); - try { - if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } - catch (InterruptedException e) { - LogMinerHelper.logError(metrics, "Thread interrupted during shutdown", e); + + if (this.metrics != null) { + // if metrics registered, unregister them + this.metrics.unregister(LOGGER); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java index 2b05870c2..d555094fe 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetrics.java @@ -35,7 +35,6 @@ public class TransactionalBufferMetrics extends Metrics implements Transactional private final AtomicLong committedDmlCounter = new AtomicLong(); private final AtomicLong lastCommitDuration = new AtomicLong(); private final AtomicLong maxCommitDuration = new AtomicLong(); - private final AtomicInteger commitQueueCapacity = new AtomicInteger(); private final AtomicReference lagFromTheSource = new AtomicReference<>(); private final AtomicReference maxLagFromTheSource = new AtomicReference<>(); private final AtomicReference minLagFromTheSource = new AtomicReference<>(); @@ -235,15 +234,6 @@ public int getScnFreezeCounter() { return scnFreezeCounter.get(); } - @Override - public int getCommitQueueCapacity() { - return commitQueueCapacity.get(); - } - - void setCommitQueueCapacity(int commitQueueCapacity) { - this.commitQueueCapacity.set(commitQueueCapacity); - } - public Long getLastCommitDuration() { return lastCommitDuration.get(); } @@ -267,7 +257,6 @@ public void reset() { errorCounter.set(0); warningCounter.set(0); scnFreezeCounter.set(0); - commitQueueCapacity.set(0); } @Override @@ -290,7 +279,6 @@ public String toString() { ", errorCounter=" + errorCounter.get() + ", warningCounter=" + warningCounter.get() + ", scnFreezeCounter=" + scnFreezeCounter.get() + - ", commitQueueCapacity=" + commitQueueCapacity.get() + '}'; } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java index be4117498..a9b7da81c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsMXBean.java @@ -103,13 +103,6 @@ public interface TransactionalBufferMetricsMXBean { */ Set getRolledBackTransactionIds(); - /** - * Gets commit queue capacity. As the queue fills up, this reduces to zero - * - * @return the commit queue capacity - */ - int getCommitQueueCapacity(); - /** * Reset metrics */ diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java index 7686c4f27..b97d13e93 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferMetricsTest.java @@ -148,9 +148,5 @@ public void testOtherMetrics() { metrics.setOffsetScn(10L); assertThat(metrics.getOldestScn() == 10).isTrue(); - - metrics.setCommitQueueCapacity(1000); - assertThat(metrics.getCommitQueueCapacity()).isEqualTo(1000); - } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index 45c68016e..f4fd16536 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -7,9 +7,6 @@ import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE; import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_QUEUE_SIZE; -import static junit.framework.TestCase.assertNotSame; -import static junit.framework.TestCase.assertSame; -import static junit.framework.TestCase.assertTrue; import static org.fest.assertions.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -19,7 +16,6 @@ import java.time.temporal.ChronoUnit; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; @@ -27,12 +23,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; +import org.mockito.Mockito; import io.debezium.config.Configuration; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.oracle.OracleConnector; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OracleTaskContext; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName; @@ -72,6 +70,7 @@ public String getString(String key) { private static final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config); private static OracleOffsetContext offsetContext; + private OracleTaskContext taskContext; private ErrorHandler errorHandler; private TransactionalBuffer transactionalBuffer; private TransactionalBufferMetrics metrics; @@ -88,10 +87,15 @@ public void before() { .maxQueueSize(DEFAULT_MAX_QUEUE_SIZE) .build(); errorHandler = new ErrorHandler(OracleConnector.class, SERVER_NAME, queue); - metrics = mock(TransactionalBufferMetrics.class); + + taskContext = mock(OracleTaskContext.class); + Mockito.when(taskContext.getConnectorName()).thenReturn("connector name"); + Mockito.when(taskContext.getConnectorType()).thenReturn("connector type"); + dispatcher = mock(EventDispatcher.class); - transactionalBuffer = new TransactionalBuffer(SERVER_NAME, errorHandler, metrics, - DEFAULT_MAX_QUEUE_SIZE); + + transactionalBuffer = new TransactionalBuffer(taskContext, errorHandler); + metrics = transactionalBuffer.getMetrics(); } @After @@ -111,14 +115,6 @@ public void testIsNotEmptyWhenTransactionIsRegistered() { assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); } - @Test - public void testIsNotEmptyWhenTransactionIsCommitting() { - transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> Thread.sleep(1000)); - offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext()); - transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); - assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); - } - @Test public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException { CountDownLatch commitLatch = new CountDownLatch(1); @@ -246,7 +242,8 @@ public void testResetLargestScn() { public void testAbandoningOneTransaction() { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> { }); - transactionalBuffer.abandonLongTransactions(SCN.longValue()); + offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext()); + transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(Scn.ZERO); } @@ -257,7 +254,7 @@ public void testAbandoningTransactionHavingAnotherOne() { }); transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> { }); - transactionalBuffer.abandonLongTransactions(SCN.longValue()); + transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext); assertThat(transactionalBuffer.isEmpty()).isEqualTo(false); assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); } @@ -274,40 +271,6 @@ public void testTransactionDump() { assertThat(transactionalBuffer.toString()).contains(String.valueOf(OTHER_SCN)); } - @Test - public void testCommitQueueOverflowProcessedOnCaller() throws InterruptedException { - Thread mainThread = Thread.currentThread(); - int commitQueueCapacity = 10; - transactionalBuffer = new TransactionalBuffer(SERVER_NAME, errorHandler, metrics, - commitQueueCapacity); - int transactionToCommitCount = commitQueueCapacity + 1; - CountDownLatch countDownLatch = new CountDownLatch(transactionToCommitCount + 1); - for (int i = 0; i <= commitQueueCapacity; i++) { - commitTransaction((timestamp, smallestScn, commitScn, counter) -> { - assertNotSame(mainThread, Thread.currentThread()); - TimeUnit.MILLISECONDS.sleep(100); - countDownLatch.countDown(); - }); - } - // Commit one more over the capacity. This should process in the test thread, applying backpressure - // to the caller - commitTransaction((timestamp, smallestScn, commitScn, counter) -> { - assertSame(mainThread, Thread.currentThread()); - countDownLatch.countDown(); - }); - - TimeUnit.SECONDS.sleep(2); - - // Commit one more over the capacity. After delay, the executor had time to recover and empty its queue - // This should go back to processing in the executor thread - commitTransaction((timestamp, smallestScn, commitScn, counter) -> { - assertNotSame(mainThread, Thread.currentThread()); - countDownLatch.countDown(); - }); - - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); - } - private void commitTransaction(TransactionalBuffer.CommitCallback commitCallback) { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), commitCallback); offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());