DBZ-3645 Adjust metrics calculations & logging levels

This commit is contained in:
Chris Cranford 2021-06-23 00:43:22 -04:00 committed by Gunnar Morling
parent 16eb60d8f1
commit 61b22d9574

View File

@ -129,11 +129,9 @@ boolean isDdlOperationRegistered(Scn scn) {
*/
void registerDmlOperation(int operation, String transactionId, Scn scn, TableId tableId, LogMinerDmlEntry parseEntry,
Instant changeTime, String rowId, Object rsId, long hash) {
registerEvent(transactionId, scn, hash, () -> new DmlEvent(operation, parseEntry, scn, tableId, rowId, rsId));
streamingMetrics.setActiveTransactions(transactions.size());
streamingMetrics.incrementRegisteredDmlCount();
streamingMetrics.calculateLagMetrics(changeTime);
if (registerEvent(transactionId, scn, hash, changeTime, () -> new DmlEvent(operation, parseEntry, scn, tableId, rowId, rsId))) {
streamingMetrics.incrementRegisteredDmlCount();
}
}
/**
@ -151,10 +149,8 @@ void registerDmlOperation(int operation, String transactionId, Scn scn, TableId
*/
void registerSelectLobOperation(int operation, String transactionId, Scn scn, TableId tableId, LogMinerDmlEntry parseEntry,
String columnName, boolean binaryData, Instant changeTime, String rowId, Object rsId, long hash) {
registerEvent(transactionId, scn, hash, () -> new SelectLobLocatorEvent(operation, parseEntry, columnName, binaryData, scn, tableId, rowId, rsId));
streamingMetrics.setActiveTransactions(transactions.size());
streamingMetrics.calculateLagMetrics(changeTime);
registerEvent(transactionId, scn, hash, changeTime,
() -> new SelectLobLocatorEvent(operation, parseEntry, columnName, binaryData, scn, tableId, rowId, rsId));
}
/**
@ -174,10 +170,9 @@ void registerLobWriteOperation(int operation, String transactionId, Scn scn, Tab
Instant changeTime, String rowId, Object rsId, long hash) {
if (data != null) {
final String sql = parseLobWriteSql(data);
registerEvent(transactionId, scn, hash, () -> new LobWriteEvent(operation, sql, scn, tableId, rowId, rsId));
registerEvent(transactionId, scn, hash, changeTime,
() -> new LobWriteEvent(operation, sql, scn, tableId, rowId, rsId));
streamingMetrics.setActiveTransactions(transactions.size());
streamingMetrics.calculateLagMetrics(changeTime);
}
}
@ -195,9 +190,7 @@ void registerLobWriteOperation(int operation, String transactionId, Scn scn, Tab
*/
void registerLobEraseOperation(int operation, String transactionId, Scn scn, TableId tableId, Instant changeTime,
String rowId, Object rsId, long hash) {
registerEvent(transactionId, scn, hash, () -> new LobEraseEvent(operation, scn, tableId, rowId, rsId));
streamingMetrics.setActiveTransactions(transactions.size());
streamingMetrics.calculateLagMetrics(changeTime);
registerEvent(transactionId, scn, hash, changeTime, () -> new LobEraseEvent(operation, scn, tableId, rowId, rsId));
}
/**
@ -270,8 +263,7 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext,
// On the restarting connector, we start from SCN in the offset. There is possibility to commit a transaction(s) which were already committed.
// Currently we cannot use ">=", because we may lose normal commit which may happen at the same time. TODO use audit table to prevent duplications
if ((offsetContext.getCommitScn() != null && offsetContext.getCommitScn().compareTo(scn) > 0) || lastCommittedScn.compareTo(scn) > 0) {
LogMinerHelper.logWarn(streamingMetrics,
"Transaction {} was already processed, ignore. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}",
LOGGER.debug("Transaction {} already processed, ignored. Committed SCN in offset is {}, commit SCN of the transaction is {}, last committed SCN is {}",
transactionId, offsetContext.getCommitScn(), scn, lastCommittedScn);
streamingMetrics.setActiveTransactions(transactions.size());
return false;
@ -486,23 +478,27 @@ public void close() {
/**
* Helper method to register a given {@link LogMinerEvent} implementation with the buffer.
* If the event is registered, the underlying metrics active transactions and lag will be re-calculated.
*
* @param transactionId transaction id that contained the given event
* @param scn system change number for the event
* @param hash unique hash that identifies the row in a transaction
* @param changeTime the time the event occurred
* @param supplier supplier function to generate the event if validity checks pass
* @return true if the event was registered, false otherwise
*/
private void registerEvent(String transactionId, Scn scn, long hash, Supplier<LogMinerEvent> supplier) {
private boolean registerEvent(String transactionId, Scn scn, long hash, Instant changeTime, Supplier<LogMinerEvent> supplier) {
if (abandonedTransactionIds.contains(transactionId)) {
LogMinerHelper.logWarn(streamingMetrics, "Event for abandoned transaction {}, ignored.", transactionId);
return;
return false;
}
if (rolledBackTransactionIds.contains(transactionId)) {
LogMinerHelper.logWarn(streamingMetrics, "Event for rolled back transaction {}, ignored.", transactionId);
return;
return false;
}
if (isRecentlyCommitted(transactionId)) {
LOGGER.trace("Event for transaction {} skipped, transaction already committed.", transactionId);
return;
return false;
}
Transaction transaction = transactions.computeIfAbsent(transactionId, s -> new Transaction(transactionId, scn));
@ -512,7 +508,12 @@ private void registerEvent(String transactionId, Scn scn, long hash, Supplier<Lo
if (!transaction.eventHashes.contains(hash)) {
transaction.eventHashes.add(hash);
transaction.events.add(supplier.get());
streamingMetrics.setActiveTransactions(transactions.size());
streamingMetrics.calculateLagMetrics(changeTime);
return true;
}
return false;
}
/**