DBZ-2543 Add some streaming metrics

This commit is contained in:
Chris Cranford 2023-08-02 02:26:40 -04:00 committed by Chris Cranford
parent d977e9b0f1
commit 2d4d34428d

View File

@ -69,7 +69,6 @@ public class OpenLogReplicatorStreamingChangeEventSource implements StreamingCha
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
private final Clock clock; private final Clock clock;
private final OracleDatabaseSchema schema; private final OracleDatabaseSchema schema;
// todo: need to wire up metrics for this adapter
private final OracleStreamingChangeEventSourceMetrics streamingMetrics; private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
private OraclePartition partition; private OraclePartition partition;
@ -168,7 +167,7 @@ private void onEvent(StreamingEvent event) throws Exception {
} }
} }
private void onBeginEvent(StreamingEvent event) throws Exception { private void onBeginEvent(StreamingEvent event) {
final String transactionId = event.getXid(); final String transactionId = event.getXid();
final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp())); final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp()));
@ -178,6 +177,9 @@ private void onBeginEvent(StreamingEvent event) throws Exception {
offsetContext.setSourceTime(timestamp); offsetContext.setSourceTime(timestamp);
transactionEvents = 0; transactionEvents = 0;
streamingMetrics.setOffsetScn(offsetContext.getScn());
streamingMetrics.setActiveTransactions(1);
// We do not specifically start a transaction boundary here. // We do not specifically start a transaction boundary here.
// //
// This is delayed until the data change event on the first data change that is to be // This is delayed until the data change event on the first data change that is to be
@ -185,7 +187,7 @@ private void onBeginEvent(StreamingEvent event) throws Exception {
// of interest to the connector. // of interest to the connector.
} }
private void onCommitEvent(StreamingEvent event) throws Exception { private void onCommitEvent(StreamingEvent event) throws InterruptedException {
final String transactionId = event.getXid(); final String transactionId = event.getXid();
final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp())); final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp()));
@ -194,6 +196,11 @@ private void onCommitEvent(StreamingEvent event) throws Exception {
offsetContext.setTransactionId(transactionId); offsetContext.setTransactionId(transactionId);
offsetContext.setSourceTime(timestamp); offsetContext.setSourceTime(timestamp);
streamingMetrics.setOffsetScn(offsetContext.getScn());
streamingMetrics.setCommittedScn(offsetContext.getScn());
streamingMetrics.setActiveTransactions(0);
streamingMetrics.incrementCommittedTransactions();
// We may see empty transactions and in this case we don't want to emit a transaction boundary // We may see empty transactions and in this case we don't want to emit a transaction boundary
// record for these cases. Only trigger commit when there are valid changes. // record for these cases. Only trigger commit when there are valid changes.
if (transactionEvents > 0) { if (transactionEvents > 0) {
@ -209,6 +216,9 @@ private void onCheckpointEvent(StreamingEvent event) {
offsetContext.setEventScn(Scn.valueOf(event.getScn())); offsetContext.setEventScn(Scn.valueOf(event.getScn()));
offsetContext.setTransactionId(transactionId); offsetContext.setTransactionId(transactionId);
offsetContext.setSourceTime(timestamp); offsetContext.setSourceTime(timestamp);
streamingMetrics.setOffsetScn(offsetContext.getScn());
streamingMetrics.setCommittedScn(offsetContext.getScn());
} }
private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutationEvent) throws Exception { private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutationEvent) throws Exception {
@ -253,6 +263,12 @@ private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutatio
offsetContext.setEventScn(Scn.valueOf(event.getScn())); offsetContext.setEventScn(Scn.valueOf(event.getScn()));
offsetContext.setTransactionId(event.getXid()); offsetContext.setTransactionId(event.getXid());
offsetContext.tableEvent(tableId, timestamp); offsetContext.tableEvent(tableId, timestamp);
streamingMetrics.setOffsetScn(offsetContext.getScn());
streamingMetrics.addProcessedRows(1L);
streamingMetrics.setLastCapturedDmlCount(1);
streamingMetrics.incrementRegisteredDmlCount();
transactionEvents++; transactionEvents++;
final Object[] oldValues = toColumnValuesArray(table, mutationEvent.getBefore()); final Object[] oldValues = toColumnValuesArray(table, mutationEvent.getBefore());
@ -295,6 +311,10 @@ private void onSchemaChangeEvent(StreamingEvent event, SchemaChangeEvent schemaE
offsetContext.setTransactionId(event.getXid()); offsetContext.setTransactionId(event.getXid());
offsetContext.tableEvent(tableId, timestamp); offsetContext.tableEvent(tableId, timestamp);
streamingMetrics.setOffsetScn(offsetContext.getScn());
streamingMetrics.setCommittedScn(offsetContext.getScn());
streamingMetrics.addProcessedRows(1L);
final String sqlStatement = schemaEvent.getSql().toLowerCase().trim(); final String sqlStatement = schemaEvent.getSql().toLowerCase().trim();
if (isRecycleBinAlterStatement(sqlStatement)) { if (isRecycleBinAlterStatement(sqlStatement)) {
LOGGER.trace("Skipping internal DDL: {}", schemaEvent.getSql()); LOGGER.trace("Skipping internal DDL: {}", schemaEvent.getSql());