DBZ-3038 Capture additional LogMiner JMX Metrics

* Min/Max duration to start a mining session
* Min/Max number of logs mined in a given session
* Min/Max duration of a mining batch loop
* Some metrics cleanup
This commit is contained in:
Chris Cranford 2021-02-23 09:43:38 -05:00 committed by Gunnar Morling
parent 879449d1c3
commit 2dedf1e27a
3 changed files with 104 additions and 43 deletions

View File

@ -28,7 +28,6 @@
@ThreadSafe @ThreadSafe
public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean { public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private final static int DEFAULT_HOURS_TO_KEEP_TRANSACTION = 4;
private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerMetrics.class); private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerMetrics.class);
private final AtomicLong currentScn = new AtomicLong(); private final AtomicLong currentScn = new AtomicLong();
@ -44,12 +43,18 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
private final AtomicReference<Duration> lastBatchProcessingDuration = new AtomicReference<>(); private final AtomicReference<Duration> lastBatchProcessingDuration = new AtomicReference<>();
private final AtomicReference<Duration> maxBatchProcessingDuration = new AtomicReference<>(); private final AtomicReference<Duration> maxBatchProcessingDuration = new AtomicReference<>();
private final AtomicReference<Duration> totalParseTime = new AtomicReference<>(); private final AtomicReference<Duration> totalParseTime = new AtomicReference<>();
private final AtomicReference<Duration> totalStartLogMiningSession = new AtomicReference<>(); private final AtomicReference<Duration> totalStartLogMiningSessionDuration = new AtomicReference<>();
private final AtomicReference<Duration> lastStartLogMiningSessionDuration = new AtomicReference<>();
private final AtomicReference<Duration> maxStartingLogMiningSessionDuration = new AtomicReference<>();
private final AtomicReference<Duration> totalProcessingTime = new AtomicReference<>(); private final AtomicReference<Duration> totalProcessingTime = new AtomicReference<>();
private final AtomicReference<Duration> minBatchProcessingTime = new AtomicReference<>();
private final AtomicReference<Duration> maxBatchProcessingTime = new AtomicReference<>();
private final AtomicReference<Duration> totalResultSetNextTime = new AtomicReference<>(); private final AtomicReference<Duration> totalResultSetNextTime = new AtomicReference<>();
private final AtomicLong maxBatchProcessingThroughput = new AtomicLong(); private final AtomicLong maxBatchProcessingThroughput = new AtomicLong();
private final AtomicReference<String[]> currentLogFileName; private final AtomicReference<String[]> currentLogFileName;
private final AtomicReference<String[]> redoLogStatus; private final AtomicReference<String[]> redoLogStatus;
private final AtomicLong minimumLogsMined = new AtomicLong();
private final AtomicLong maximumLogsMined = new AtomicLong();
private final AtomicInteger switchCounter = new AtomicInteger(); private final AtomicInteger switchCounter = new AtomicInteger();
private final AtomicInteger batchSize = new AtomicInteger(); private final AtomicInteger batchSize = new AtomicInteger();
@ -75,9 +80,12 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
currentScn.set(-1); currentScn.set(-1);
currentLogFileName = new AtomicReference<>(); currentLogFileName = new AtomicReference<>();
minimumLogsMined.set(0L);
maximumLogsMined.set(0L);
redoLogStatus = new AtomicReference<>(); redoLogStatus = new AtomicReference<>();
switchCounter.set(0); switchCounter.set(0);
recordMiningHistory.set(connectorConfig.isLogMiningHistoryRecorded());
batchSizeDefault = connectorConfig.getLogMiningBatchSizeDefault(); batchSizeDefault = connectorConfig.getLogMiningBatchSizeDefault();
batchSizeMin = connectorConfig.getLogMiningBatchSizeMin(); batchSizeMin = connectorConfig.getLogMiningBatchSizeMin();
batchSizeMax = connectorConfig.getLogMiningBatchSizeMax(); batchSizeMax = connectorConfig.getLogMiningBatchSizeMax();
@ -95,7 +103,6 @@ public class LogMinerMetrics extends Metrics implements LogMinerMetricsMXBean {
@Override @Override
public void reset() { public void reset() {
batchSize.set(batchSizeDefault); batchSize.set(batchSizeDefault);
millisecondToSleepBetweenMiningQuery.set(sleepTimeDefault); millisecondToSleepBetweenMiningQuery.set(sleepTimeDefault);
totalCapturedDmlCount.set(0); totalCapturedDmlCount.set(0);
@ -112,18 +119,37 @@ public void reset() {
lastBatchProcessingDuration.set(Duration.ZERO); lastBatchProcessingDuration.set(Duration.ZERO);
networkConnectionProblemsCounter.set(0); networkConnectionProblemsCounter.set(0);
totalParseTime.set(Duration.ZERO); totalParseTime.set(Duration.ZERO);
totalStartLogMiningSession.set(Duration.ZERO); totalStartLogMiningSessionDuration.set(Duration.ZERO);
lastStartLogMiningSessionDuration.set(Duration.ZERO);
maxStartingLogMiningSessionDuration.set(Duration.ZERO);
totalProcessingTime.set(Duration.ZERO); totalProcessingTime.set(Duration.ZERO);
minBatchProcessingTime.set(Duration.ZERO);
maxBatchProcessingTime.set(Duration.ZERO);
totalResultSetNextTime.set(Duration.ZERO); totalResultSetNextTime.set(Duration.ZERO);
} }
// setters
public void setCurrentScn(Long scn) { public void setCurrentScn(Long scn) {
currentScn.set(scn); currentScn.set(scn);
} }
public void setCurrentLogFileName(Set<String> names) { public void setCurrentLogFileName(Set<String> names) {
currentLogFileName.set(names.stream().toArray(String[]::new)); currentLogFileName.set(names.stream().toArray(String[]::new));
if (names.size() < minimumLogsMined.get()) {
minimumLogsMined.set(names.size());
}
if (names.size() > maximumLogsMined.get()) {
maximumLogsMined.set(names.size());
}
}
@Override
public long getMinimumMinedLogCount() {
return minimumLogsMined.get();
}
@Override
public long getMaximumMinedLogCount() {
return maximumLogsMined.get();
} }
public void setRedoLogStatus(Map<String, String> status) { public void setRedoLogStatus(Map<String, String> status) {
@ -167,7 +193,6 @@ public void incrementNetworkConnectionProblemsCounter() {
networkConnectionProblemsCounter.incrementAndGet(); networkConnectionProblemsCounter.incrementAndGet();
} }
// implemented getters
@Override @Override
public Long getCurrentScn() { public Long getCurrentScn() {
return currentScn.get(); return currentScn.get();
@ -290,11 +315,25 @@ public void addCurrentParseTime(Duration currentParseTime) {
@Override @Override
public long getTotalMiningSessionStartTimeInMilliseconds() { public long getTotalMiningSessionStartTimeInMilliseconds() {
return totalStartLogMiningSession.get().toMillis(); return totalStartLogMiningSessionDuration.get().toMillis();
} }
public void addCurrentMiningSessionStart(Duration currentStartLogMiningSession) { public void addCurrentMiningSessionStart(Duration currentStartLogMiningSession) {
totalStartLogMiningSession.accumulateAndGet(currentStartLogMiningSession, Duration::plus); lastStartLogMiningSessionDuration.set(currentStartLogMiningSession);
if (currentStartLogMiningSession.compareTo(maxStartingLogMiningSessionDuration.get()) > 0) {
maxStartingLogMiningSessionDuration.set(currentStartLogMiningSession);
}
totalStartLogMiningSessionDuration.accumulateAndGet(currentStartLogMiningSession, Duration::plus);
}
@Override
public long getLastMiningSessionStartTimeInMilliseconds() {
return lastStartLogMiningSessionDuration.get().toMillis();
}
@Override
public long getMaxMiningSessionStartTimeInMilliseconds() {
return maxStartingLogMiningSessionDuration.get().toMillis();
} }
@Override @Override
@ -302,8 +341,18 @@ public long getTotalProcessingTimeInMilliseconds() {
return totalProcessingTime.get().toMillis(); return totalProcessingTime.get().toMillis();
} }
public void setTotalProcessingTime(Duration processingTime) { @Override
totalProcessingTime.set(processingTime); public long getMinBatchProcessingTimeInMilliseconds() {
return minBatchProcessingTime.get().toMillis();
}
@Override
public long getMaxBatchProcessingTimeInMilliseconds() {
return maxBatchProcessingTime.get().toMillis();
}
public void setCurrentBatchProcessingTime(Duration currentBatchProcessingTime) {
totalProcessingTime.accumulateAndGet(currentBatchProcessingTime, Duration::plus);
} }
public void addCurrentResultSetNext(Duration currentNextTime) { public void addCurrentResultSetNext(Duration currentNextTime) {
@ -314,7 +363,6 @@ public void addProcessedRows(Long rows) {
totalProcessedRows.getAndAdd(rows); totalProcessedRows.getAndAdd(rows);
} }
// MBean accessible setters
@Override @Override
public void setBatchSize(int size) { public void setBatchSize(int size) {
if (size >= batchSizeMin && size <= batchSizeMax) { if (size >= batchSizeMin && size <= batchSizeMax) {
@ -360,11 +408,6 @@ else if (currentBatchSize > batchSizeMin) {
} }
} }
@Override
public void setRecordMiningHistory(boolean doRecording) {
recordMiningHistory.set(doRecording);
}
@Override @Override
public String toString() { public String toString() {
return "LogMinerMetrics{" + return "LogMinerMetrics{" +
@ -382,6 +425,8 @@ public String toString() {
", maxBatchProcessingDuration=" + maxBatchProcessingDuration + ", maxBatchProcessingDuration=" + maxBatchProcessingDuration +
", maxBatchProcessingThroughput=" + maxBatchProcessingThroughput + ", maxBatchProcessingThroughput=" + maxBatchProcessingThroughput +
", currentLogFileName=" + currentLogFileName + ", currentLogFileName=" + currentLogFileName +
", minLogFilesMined=" + minimumLogsMined +
", maxLogFilesMined=" + maximumLogsMined +
", redoLogStatus=" + redoLogStatus + ", redoLogStatus=" + redoLogStatus +
", switchCounter=" + switchCounter + ", switchCounter=" + switchCounter +
", batchSize=" + batchSize + ", batchSize=" + batchSize +
@ -397,8 +442,12 @@ public String toString() {
", sleepTimeMax=" + sleepTimeMax + ", sleepTimeMax=" + sleepTimeMax +
", sleepTimeIncrement=" + sleepTimeIncrement + ", sleepTimeIncrement=" + sleepTimeIncrement +
", totalParseTime=" + totalParseTime + ", totalParseTime=" + totalParseTime +
", totalStartLogMiningSession=" + totalStartLogMiningSession + ", totalStartLogMiningSessionDuration=" + totalStartLogMiningSessionDuration +
", lastStartLogMiningSessionDuration=" + lastStartLogMiningSessionDuration +
", maxStartLogMiningSessionDuration=" + maxStartingLogMiningSessionDuration +
", totalProcessTime=" + totalProcessingTime + ", totalProcessTime=" + totalProcessingTime +
", minBatchProcessTime=" + minBatchProcessingTime +
", maxBatchProcessTime=" + maxBatchProcessingTime +
", totalResultSetNextTime=" + totalResultSetNextTime + ", totalResultSetNextTime=" + totalResultSetNextTime +
'}'; '}';
} }

View File

@ -11,19 +11,25 @@
public interface LogMinerMetricsMXBean { public interface LogMinerMetricsMXBean {
/** /**
* Exposes current SCN in the database. This is very efficient query and will not affect overall performance * @return the current system change number of the database
*
* @return current SCN
*/ */
Long getCurrentScn(); Long getCurrentScn();
/** /**
* Exposes current redo log file. This is very efficient query and will not affect overall performance * @return array of current filenames to be used by the mining session.
*
* @return full path or NULL if an exception occurs.
*/ */
String[] getCurrentRedoLogFileName(); String[] getCurrentRedoLogFileName();
/**
* @return the minimum number of logs used by a mining session
*/
long getMinimumMinedLogCount();
/**
* @return the maximum number of logs used by a mining session
*/
long getMaximumMinedLogCount();
/** /**
* Exposes states of redo logs: current, active, inactive, unused ... * Exposes states of redo logs: current, active, inactive, unused ...
* @return array of: (redo log name | status) elements * @return array of: (redo log name | status) elements
@ -107,12 +113,6 @@ public interface LogMinerMetricsMXBean {
void changeBatchSize(boolean increment); void changeBatchSize(boolean increment);
/**
* this flag turns on recording of captured incremental changes. It creates an overhead on CPU and takes disk space
* @param doRecording true - record
*/
void setRecordMiningHistory(boolean doRecording);
/** /**
* this flag indicates whether log mining is being recorded by {@link HistoryRecorder} * this flag indicates whether log mining is being recorded by {@link HistoryRecorder}
*/ */
@ -157,11 +157,31 @@ public interface LogMinerMetricsMXBean {
*/ */
long getTotalMiningSessionStartTimeInMilliseconds(); long getTotalMiningSessionStartTimeInMilliseconds();
/**
* @return the total number of milliseconds the last mining session took to start
*/
long getLastMiningSessionStartTimeInMilliseconds();
/**
* @return the duration in milliseconds of the longest mining session start
*/
long getMaxMiningSessionStartTimeInMilliseconds();
/** /**
* @return the total number of milliseconds spent mining and processing results * @return the total number of milliseconds spent mining and processing results
*/ */
long getTotalProcessingTimeInMilliseconds(); long getTotalProcessingTimeInMilliseconds();
/**
* @return the minimum time in milliseconds spent processing results from a single LogMiner session
*/
long getMinBatchProcessingTimeInMilliseconds();
/**
* @return the maximum time in milliseconds spent processing results from a single LogMiner session
*/
long getMaxBatchProcessingTimeInMilliseconds();
/** /**
* @return the total number of log miner rows processed. * @return the total number of log miner rows processed.
*/ */
@ -172,15 +192,13 @@ public interface LogMinerMetricsMXBean {
*/ */
long getTotalResultSetNextTimeInMilliseconds(); long getTotalResultSetNextTimeInMilliseconds();
/**
* Resets metrics.
*/
void reset();
// *** following metrics work if RecordMiningHistory is true.
/** /**
* @return the number of hours to keep transaction in buffer before abandoning * @return the number of hours to keep transaction in buffer before abandoning
*/ */
int getHoursToKeepTransactionInBuffer(); int getHoursToKeepTransactionInBuffer();
/**
* Resets metrics.
*/
void reset();
} }

View File

@ -133,7 +133,6 @@ public void execute(ChangeEventSourceContext context) {
HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder(); HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();
Duration processTime = Duration.ZERO;
try { try {
// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration? // todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours()); historyRecorder.prepare(logMinerMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());
@ -190,9 +189,7 @@ public void execute(ChangeEventSourceContext context) {
} }
} }
processTime = processTime.plus(Duration.between(start, Instant.now())); logMinerMetrics.setCurrentBatchProcessingTime(Duration.between(start, Instant.now()));
logMinerMetrics.setTotalProcessingTime(processTime);
pauseBetweenMiningSessions(); pauseBetweenMiningSessions();
} }
} }
@ -220,9 +217,6 @@ public void execute(ChangeEventSourceContext context) {
private void registerLogMinerMetrics() { private void registerLogMinerMetrics() {
logMinerMetrics = new LogMinerMetrics(taskContext, connectorConfig); logMinerMetrics = new LogMinerMetrics(taskContext, connectorConfig);
logMinerMetrics.register(LOGGER); logMinerMetrics.register(LOGGER);
if (connectorConfig.isLogMiningHistoryRecorded()) {
logMinerMetrics.setRecordMiningHistory(true);
}
} }
private void unregisterLogMinerMetrics() { private void unregisterLogMinerMetrics() {