From 61b22d9574454d720a4a58036e38824988a8a76a Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 23 Jun 2021 00:43:22 -0400 Subject: [PATCH] DBZ-3645 Adjust metrics calculations & logging levels --- .../oracle/logminer/TransactionalBuffer.java | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 93dca6ea1..7177827a6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -129,11 +129,9 @@ boolean isDdlOperationRegistered(Scn scn) { */ void registerDmlOperation(int operation, String transactionId, Scn scn, TableId tableId, LogMinerDmlEntry parseEntry, Instant changeTime, String rowId, Object rsId, long hash) { - registerEvent(transactionId, scn, hash, () -> new DmlEvent(operation, parseEntry, scn, tableId, rowId, rsId)); - - streamingMetrics.setActiveTransactions(transactions.size()); - streamingMetrics.incrementRegisteredDmlCount(); - streamingMetrics.calculateLagMetrics(changeTime); + if (registerEvent(transactionId, scn, hash, changeTime, () -> new DmlEvent(operation, parseEntry, scn, tableId, rowId, rsId))) { + streamingMetrics.incrementRegisteredDmlCount(); + } } /** @@ -151,10 +149,8 @@ void registerDmlOperation(int operation, String transactionId, Scn scn, TableId */ void registerSelectLobOperation(int operation, String transactionId, Scn scn, TableId tableId, LogMinerDmlEntry parseEntry, String columnName, boolean binaryData, Instant changeTime, String rowId, Object rsId, long hash) { - registerEvent(transactionId, scn, hash, () -> new SelectLobLocatorEvent(operation, parseEntry, columnName, binaryData, scn, tableId, rowId, rsId)); - - streamingMetrics.setActiveTransactions(transactions.size()); - streamingMetrics.calculateLagMetrics(changeTime); + registerEvent(transactionId, scn, hash, changeTime, + () -> new SelectLobLocatorEvent(operation, parseEntry, columnName, binaryData, scn, tableId, rowId, rsId)); } /** @@ -174,10 +170,9 @@ void registerLobWriteOperation(int operation, String transactionId, Scn scn, Tab Instant changeTime, String rowId, Object rsId, long hash) { if (data != null) { final String sql = parseLobWriteSql(data); - registerEvent(transactionId, scn, hash, () -> new LobWriteEvent(operation, sql, scn, tableId, rowId, rsId)); + registerEvent(transactionId, scn, hash, changeTime, + () -> new LobWriteEvent(operation, sql, scn, tableId, rowId, rsId)); - streamingMetrics.setActiveTransactions(transactions.size()); - streamingMetrics.calculateLagMetrics(changeTime); } } @@ -195,9 +190,7 @@ void registerLobWriteOperation(int operation, String transactionId, Scn scn, Tab */ void registerLobEraseOperation(int operation, String transactionId, Scn scn, TableId tableId, Instant changeTime, String rowId, Object rsId, long hash) { - registerEvent(transactionId, scn, hash, () -> new LobEraseEvent(operation, scn, tableId, rowId, rsId)); - streamingMetrics.setActiveTransactions(transactions.size()); - streamingMetrics.calculateLagMetrics(changeTime); + registerEvent(transactionId, scn, hash, changeTime, () -> new LobEraseEvent(operation, scn, tableId, rowId, rsId)); } /** @@ -270,8 +263,7 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, // On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed. // Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn().compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) { - LogMinerHelper.logWarn(streamingMetrics, - "Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}", + LOGGER.debug("Transaction {} already processed, ignored. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}", transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn); streamingMetrics.setActiveTransactions(transactions.size()); return false; @@ -486,23 +478,27 @@ public void close() { /** * Helper method to register a given {@link LogMinerEvent} implementation with the buffer. + * If the event is registered, the underlying metrics active transactions and lag will be re-calculated. * * @param transactionId transaction id that contained the given event * @param scn system change number for the event + * @param hash unique hash that identifies the row in a transaction + * @param changeTime the time the event occurred * @param supplier supplier function to generate the event if validity checks pass + * @return true if the event was registered, false otherwise */ - private void registerEvent(String transactionId, Scn scn, long hash, Supplier supplier) { + private boolean registerEvent(String transactionId, Scn scn, long hash, Instant changeTime, Supplier supplier) { if (abandonedTransactionIds.contains(transactionId)) { LogMinerHelper.logWarn(streamingMetrics, "Event for abandoned transaction {}, ignored.", transactionId); - return; + return false; } if (rolledBackTransactionIds.contains(transactionId)) { LogMinerHelper.logWarn(streamingMetrics, "Event for rolled back transaction {}, ignored.", transactionId); - return; + return false; } if (isRecentlyCommitted(transactionId)) { LOGGER.trace("Event for transaction {} skipped, transaction already committed.", transactionId); - return; + return false; } Transaction transaction = transactions.computeIfAbsent(transactionId, s -> new Transaction(transactionId, scn)); @@ -512,7 +508,12 @@ private void registerEvent(String transactionId, Scn scn, long hash, Supplier