diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java index 788163376..6d6548d64 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.java @@ -69,7 +69,6 @@ public class OpenLogReplicatorStreamingChangeEventSource implements StreamingCha private final ErrorHandler errorHandler; private final Clock clock; private final OracleDatabaseSchema schema; - // todo: need to wire up metrics for this adapter private final OracleStreamingChangeEventSourceMetrics streamingMetrics; private OraclePartition partition; @@ -168,7 +167,7 @@ private void onEvent(StreamingEvent event) throws Exception { } } - private void onBeginEvent(StreamingEvent event) throws Exception { + private void onBeginEvent(StreamingEvent event) { final String transactionId = event.getXid(); final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp())); @@ -178,6 +177,9 @@ private void onBeginEvent(StreamingEvent event) throws Exception { offsetContext.setSourceTime(timestamp); transactionEvents = 0; + streamingMetrics.setOffsetScn(offsetContext.getScn()); + streamingMetrics.setActiveTransactions(1); + // We do not specifically start a transaction boundary here. // // This is delayed until the data change event on the first data change that is to be @@ -185,7 +187,7 @@ private void onBeginEvent(StreamingEvent event) throws Exception { // of interest to the connector. } - private void onCommitEvent(StreamingEvent event) throws Exception { + private void onCommitEvent(StreamingEvent event) throws InterruptedException { final String transactionId = event.getXid(); final Instant timestamp = Instant.ofEpochMilli(Long.parseLong(event.getTimestamp())); @@ -194,6 +196,11 @@ private void onCommitEvent(StreamingEvent event) throws Exception { offsetContext.setTransactionId(transactionId); offsetContext.setSourceTime(timestamp); + streamingMetrics.setOffsetScn(offsetContext.getScn()); + streamingMetrics.setCommittedScn(offsetContext.getScn()); + streamingMetrics.setActiveTransactions(0); + streamingMetrics.incrementCommittedTransactions(); + // We may see empty transactions and in this case we don't want to emit a transaction boundary // record for these cases. Only trigger commit when there are valid changes. if (transactionEvents > 0) { @@ -209,6 +216,9 @@ private void onCheckpointEvent(StreamingEvent event) { offsetContext.setEventScn(Scn.valueOf(event.getScn())); offsetContext.setTransactionId(transactionId); offsetContext.setSourceTime(timestamp); + + streamingMetrics.setOffsetScn(offsetContext.getScn()); + streamingMetrics.setCommittedScn(offsetContext.getScn()); } private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutationEvent) throws Exception { @@ -253,6 +263,12 @@ private void onMutationEvent(StreamingEvent event, AbstractMutationEvent mutatio offsetContext.setEventScn(Scn.valueOf(event.getScn())); offsetContext.setTransactionId(event.getXid()); offsetContext.tableEvent(tableId, timestamp); + + streamingMetrics.setOffsetScn(offsetContext.getScn()); + streamingMetrics.addProcessedRows(1L); + streamingMetrics.setLastCapturedDmlCount(1); + streamingMetrics.incrementRegisteredDmlCount(); + transactionEvents++; final Object[] oldValues = toColumnValuesArray(table, mutationEvent.getBefore()); @@ -295,6 +311,10 @@ private void onSchemaChangeEvent(StreamingEvent event, SchemaChangeEvent schemaE offsetContext.setTransactionId(event.getXid()); offsetContext.tableEvent(tableId, timestamp); + streamingMetrics.setOffsetScn(offsetContext.getScn()); + streamingMetrics.setCommittedScn(offsetContext.getScn()); + streamingMetrics.addProcessedRows(1L); + final String sqlStatement = schemaEvent.getSql().toLowerCase().trim(); if (isRecycleBinAlterStatement(sqlStatement)) { LOGGER.trace("Skipping internal DDL: {}", schemaEvent.getSql());