diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/Transaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/Transaction.java deleted file mode 100644 index f0c7f0a0f..000000000 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/events/Transaction.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.oracle.logminer.events; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.debezium.connector.oracle.Scn; -import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.VisibleForMarshalling; - -/** - * A logical database transaction - * - * @author Chris Cranford - */ -public class Transaction { - - private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class); - private static final String UNKNOWN = "UNKNOWN"; - - private final String transactionId; - private final Scn startScn; - private final Instant changeTime; - private final List events; - private final String userName; - private int numberOfEvents; - - @VisibleForMarshalling - public Transaction(String transactionId, Scn startScn, Instant changeTime, List events, String userName, int numberOfEvents) { - this.transactionId = transactionId; - this.startScn = startScn; - this.changeTime = changeTime; - this.events = events; - this.userName = !UNKNOWN.equalsIgnoreCase(userName) ? userName : null; - this.numberOfEvents = numberOfEvents; - } - - public Transaction(String transactionId, Scn startScn, Instant changeTime, String userName) { - this(transactionId, startScn, changeTime, new ArrayList<>(), userName, 0); - } - - public String getTransactionId() { - return transactionId; - } - - public Scn getStartScn() { - return startScn; - } - - public Instant getChangeTime() { - return changeTime; - } - - public List getEvents() { - return events; - } - - public int getNumberOfEvents() { - return numberOfEvents; - } - - public int getNextEventId() { - return numberOfEvents++; - } - - /** - * Should be called when a transaction start is detected. - */ - public void started() { - numberOfEvents = 0; - } - - /** - * Removes any all events within the transaction with the specified {code rowId}. - * - * @param rowId the row id for the SQL event that should be removed - */ - public void removeEventWithRowId(String rowId) { - events.removeIf(event -> { - if (event.getRowId().equals(rowId)) { - LOGGER.trace("Undo applied for event {}.", event); - return true; - } - return false; - }); - } - - public String getUserName() { - return userName; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Transaction that = (Transaction) o; - return Objects.equals(transactionId, that.transactionId); - } - - @Override - public int hashCode() { - return Objects.hash(transactionId); - } - - @Override - public String toString() { - return "Transaction{" + - "transactionId='" + transactionId + '\'' + - ", startScn=" + startScn + - ", userName='" + userName + - ", numberOfEvents=" + numberOfEvents + - "'}"; - } -} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 7f169ceb0..8d2b60dc1 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -30,7 +30,6 @@ import io.debezium.connector.oracle.logminer.events.LogMinerEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent; -import io.debezium.connector.oracle.logminer.events.Transaction; import io.debezium.connector.oracle.logminer.parser.DmlParserException; import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry; import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser; @@ -45,7 +44,7 @@ * * @author Chris Cranford */ -public abstract class AbstractLogMinerEventProcessor implements LogMinerEventProcessor { +public abstract class AbstractLogMinerEventProcessor implements LogMinerEventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class); @@ -56,7 +55,6 @@ public abstract class AbstractLogMinerEventProcessor implements LogMinerEventPro private final OracleOffsetContext offsetContext; private final EventDispatcher dispatcher; private final OracleStreamingChangeEventSourceMetrics metrics; - private final TransactionReconciliation reconciliation; private final LogMinerDmlParser dmlParser; private final SelectLobParser selectLobParser; @@ -78,7 +76,6 @@ public AbstractLogMinerEventProcessor(ChangeEventSourceContext context, this.offsetContext = offsetContext; this.dispatcher = dispatcher; this.metrics = metrics; - this.reconciliation = new TransactionReconciliation(connectorConfig, schema); this.counters = new Counters(); this.dmlParser = new LogMinerDmlParser(); this.selectLobParser = new SelectLobParser(); @@ -92,10 +89,6 @@ protected OracleDatabaseSchema getSchema() { return schema; } - protected TransactionReconciliation getReconciliation() { - return reconciliation; - } - /** * Check whether a transaction has been recently committed. * Any implementation that does not support recently-committed tracking should return false. @@ -141,7 +134,13 @@ protected Scn getLastProcessedScn() { * Returns the {@code TransactionCache} implementation. * @return the transaction cache, never {@code null} */ - protected abstract TransactionCache getTransactionCache(); + protected abstract TransactionCache getTransactionCache(); + + protected abstract T createTransaction(LogMinerEventRow row); + + protected abstract void removeEventWithRowId(LogMinerEventRow row); + + protected abstract int getTransactionEventCount(T transaction); // todo: can this be removed in favor of a single implementation? protected boolean isTrxIdRawValue() { @@ -221,10 +220,9 @@ protected void handleMissingScn(LogMinerEventRow row) { */ protected void handleStart(LogMinerEventRow row) { final String transactionId = row.getTransactionId(); - final Transaction transaction = getTransactionCache().get(transactionId); + final AbstractTransaction transaction = getTransactionCache().get(transactionId); if (transaction == null && !isRecentlyCommitted(transactionId)) { - Transaction newTransaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName()); - getTransactionCache().put(transactionId, newTransaction); + getTransactionCache().put(transactionId, createTransaction(row)); metrics.setActiveTransactions(getTransactionCache().size()); } else if (transaction != null && !isRecentlyCommitted(transactionId)) { @@ -402,13 +400,7 @@ protected void handleDataEvent(LogMinerEventRow row) throws SQLException, Interr // with a rollback flag to indicate that the prior event should be omitted. In this // use case, the transaction can still be committed, so we need to manually rollback // the previous DML event when this use case occurs. - final Transaction transaction = getTransactionCache().get(row.getTransactionId()); - if (transaction == null) { - LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row); - } - else { - transaction.removeEventWithRowId(row.getRowId()); - } + removeEventWithRowId(row); return; } @@ -485,26 +477,7 @@ private boolean hasNextWithMetricsUpdate(ResultSet resultSet) throws SQLExceptio * @param row the LogMiner event row * @param eventSupplier the supplier of the event to create if the event is allowed to be added */ - protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { - if (isTransactionIdAllowed(transactionId)) { - Transaction transaction = getTransactionCache().get(transactionId); - if (transaction == null) { - LOGGER.trace("Transaction {} not in cache for DML, creating.", transactionId); - transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName()); - getTransactionCache().put(transactionId, transaction); - } - - int eventId = transaction.getNextEventId(); - if (transaction.getEvents().size() <= eventId) { - // Add new event at eventId offset - LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId); - transaction.getEvents().add(eventSupplier.get()); - metrics.calculateLagMetrics(row.getChangeTime()); - } - - metrics.setActiveTransactions(getTransactionCache().size()); - } - } + protected abstract void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier); /** * Dispatch a schema change event for a new table and get the newly created relational table model. diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransaction.java new file mode 100644 index 000000000..8b7b0a1cf --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransaction.java @@ -0,0 +1,80 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor; + +import java.time.Instant; +import java.util.Objects; + +import io.debezium.connector.oracle.Scn; + +/** + * An abstract implementation of an Oracle {@link Transaction}. + * + * @author Chris Cranford + */ +public abstract class AbstractTransaction implements Transaction { + + private static final String UNKNOWN = "UNKNOWN"; + + private final String transactionId; + private final Scn startScn; + private final Instant changeTime; + private final String userName; + + public AbstractTransaction(String transactionId, Scn startScn, Instant changeTime, String userName) { + this.transactionId = transactionId; + this.startScn = startScn; + this.changeTime = changeTime; + this.userName = !UNKNOWN.equalsIgnoreCase(userName) ? userName : null; + } + + @Override + public String getTransactionId() { + return transactionId; + } + + @Override + public Scn getStartScn() { + return startScn; + } + + @Override + public Instant getChangeTime() { + return changeTime; + } + + @Override + public String getUserName() { + return userName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AbstractTransaction that = (AbstractTransaction) o; + return Objects.equals(transactionId, that.transactionId); + } + + @Override + public int hashCode() { + return Objects.hash(transactionId); + } + + @Override + public String toString() { + return "AbstractTransaction{" + + "transactionId='" + transactionId + '\'' + + ", startScn=" + startScn + + ", changeTime=" + changeTime + + ", userName='" + userName + '\'' + + '}'; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/Transaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/Transaction.java new file mode 100644 index 000000000..b42fccee9 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/Transaction.java @@ -0,0 +1,32 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor; + +import java.time.Instant; + +import io.debezium.connector.oracle.Scn; + +/** + * Contract for an Oracle transaction. + * + * @author Chris Cranford + */ +public interface Transaction { + + String getTransactionId(); + + Scn getStartScn(); + + Instant getChangeTime(); + + String getUserName(); + + int getNumberOfEvents(); + + int getNextEventId(); + + void started(); +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCache.java index 1b4c922f9..21b5c958a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCache.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCache.java @@ -8,19 +8,18 @@ import java.util.Iterator; import io.debezium.connector.oracle.Scn; -import io.debezium.connector.oracle.logminer.events.Transaction; /** * Generalized contract that all transaction cache implementations should implement. * * @author Chris Cranford */ -public interface TransactionCache extends AutoCloseable { - Transaction get(String transactionId); +public interface TransactionCache extends AutoCloseable { + T get(String transactionId); - void put(String transactionId, Transaction transaction); + void put(String transactionId, T transaction); - Transaction remove(String transactionId); + T remove(String transactionId); int size(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCommitConsumer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCommitConsumer.java new file mode 100644 index 000000000..ae561a59a --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionCommitConsumer.java @@ -0,0 +1,301 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.DebeziumException; +import io.debezium.connector.oracle.BlobChunkList; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleDatabaseSchema; +import io.debezium.connector.oracle.logminer.LogMinerHelper; +import io.debezium.connector.oracle.logminer.events.DmlEvent; +import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.logminer.events.LobWriteEvent; +import io.debezium.connector.oracle.logminer.events.LogMinerEvent; +import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent; +import io.debezium.function.BlockingConsumer; +import io.debezium.relational.Table; + +/** + * A consumer of transaction events at commit time that is capable of inspecting the event stream, + * merging events that should be merged when LOB support is enabled, and then delegating the final + * stream of events to a delegate consumer. + * + * @author Chris Cranford + */ +public class TransactionCommitConsumer implements AutoCloseable, BlockingConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCommitConsumer.class); + + private final BlockingConsumer delegate; + private final OracleConnectorConfig connectorConfig; + private final OracleDatabaseSchema schema; + private final List lobWriteData; + + enum LobState { + WRITE, + ERASE, + OTHER + }; + + private LogMinerEvent lastEvent; + private SelectLobLocatorEvent lastSelectLobLocatorEvent; + private LobState lobState; + + public TransactionCommitConsumer(BlockingConsumer delegate, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) { + this.delegate = delegate; + this.lobState = LobState.OTHER; + this.lobWriteData = new ArrayList<>(); + this.connectorConfig = connectorConfig; + this.schema = schema; + } + + @Override + public void close() throws InterruptedException { + if (lastEvent != null) { + if (!lobWriteData.isEmpty()) { + mergeLobWriteData(lastEvent); + } + dispatchChangeEvent(lastEvent); + } + } + + @Override + public void accept(LogMinerEvent event) throws InterruptedException { + if (!connectorConfig.isLobEnabled()) { + // LOB support is not enabled, perform immediate dispatch + dispatchChangeEvent(event); + return; + } + + if (lastEvent == null) { + // Always cache first event, follow-up events will dictate merge/dispatch status + this.lastEvent = event; + if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) { + this.lastSelectLobLocatorEvent = (SelectLobLocatorEvent) event; + } + } + else { + + // Check whether the LOB data queue needs to be drained to the last event + LobState currentLobState = resolveLobStateByCurrentEvent(event); + if (currentLobState != this.lobState) { + if (this.lobState == LobState.WRITE) { + mergeLobWriteData(lastEvent); + } + this.lobState = currentLobState; + } + + if (!isMerged(event, lastEvent)) { + LOGGER.trace("\tMerged skipped."); + // Events were not merged, dispatch last one and cache new + dispatchChangeEvent(lastEvent); + this.lastEvent = event; + } + else { + LOGGER.trace("\tMerged successfully."); + } + } + } + + private void dispatchChangeEvent(LogMinerEvent event) throws InterruptedException { + LOGGER.trace("\tEmitting event {} {}", event.getEventType(), event); + delegate.accept(event); + } + + private LobState resolveLobStateByCurrentEvent(LogMinerEvent event) { + switch (event.getEventType()) { + case LOB_WRITE: + return LobState.WRITE; + case LOB_ERASE: + return LobState.ERASE; + default: + return LobState.OTHER; + } + } + + private boolean isMerged(LogMinerEvent event, LogMinerEvent prevEvent) { + LOGGER.trace("\tVerifying merge eligibility for event {} with {}", event.getEventType(), prevEvent.getEventType()); + if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) { + SelectLobLocatorEvent locatorEvent = (SelectLobLocatorEvent) event; + this.lastSelectLobLocatorEvent = locatorEvent; + if (EventType.INSERT == prevEvent.getEventType()) { + // Previous event is an INSERT + // Only merge the SEL_LOB_LOCATOR event if the previous INSERT is for the same table/row + // and if the INSERT's column value is either EMPTY_CLOB() or EMPTY_BLOB() + if (isForSameTableOrScn(event, prevEvent)) { + LOGGER.trace("\tMerging SEL_LOB_LOCATOR with previous INSERT event"); + return true; + } + } + else if (EventType.UPDATE == prevEvent.getEventType()) { + if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { + LOGGER.trace("\tUpdating SEL_LOB_LOCATOR column '{}' to previous UPDATE event", locatorEvent.getColumnName()); + return true; + } + } + else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) { + if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { + LOGGER.trace("\tAdding column '{}' to previous SEL_LOB_LOCATOR event", locatorEvent.getColumnName()); + return true; + } + } + } + else if (EventType.LOB_WRITE == event.getEventType()) { + final LobWriteEvent writeEvent = (LobWriteEvent) event; + if (lastSelectLobLocatorEvent.isBinary()) { + if (!writeEvent.getData().startsWith("HEXTORAW('") && !writeEvent.getData().endsWith("')")) { + throw new DebeziumException("Unexpected LOB data chunk: " + writeEvent.getData()); + } + } + LOGGER.trace("\tAdded LOB_WRITE data to internal LOB queue."); + lobWriteData.add(writeEvent.getData()); + return true; + } + else if (EventType.LOB_ERASE == event.getEventType()) { + // nothing is done with the event, its just consumed and treated as merged. + LOGGER.warn("\tLOB_ERASE for table '{}' column '{}' is not supported.", + lastSelectLobLocatorEvent.getTableId(), lastSelectLobLocatorEvent.getColumnName()); + LOGGER.trace("\tSkipped LOB_ERASE, treated as merged."); + return true; + } + else if (EventType.LOB_TRIM == event.getEventType()) { + // nothing is done with the event, its just consumed and treated as merged. + LOGGER.trace("\tSkipped LOB_TRIM, treated as merged."); + return true; + } + else if (EventType.INSERT == event.getEventType() || EventType.UPDATE == event.getEventType()) { + // Previous event is an INSERT + // The only valid combination here would be if the current event is an UPDATE since an INSERT + // cannot be merged with a prior INSERT. + if (EventType.INSERT == prevEvent.getEventType()) { + if (EventType.UPDATE == event.getEventType()) { + if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { + LOGGER.trace("\tMerging UPDATE event with previous INSERT event"); + mergeNewColumns((DmlEvent) event, (DmlEvent) prevEvent); + return true; + } + } + } + else if (EventType.UPDATE == prevEvent.getEventType()) { + // Previous event is an UPDATE + // This will happen if there are non LOB and inline LOB fields updated in the same SQL. + // The inline LOB values should be merged with the previous UPDATE event + if (EventType.UPDATE == event.getEventType()) { + if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { + LOGGER.trace("\tMerging UPDATE event with previous UPDATE event"); + mergeNewColumns((DmlEvent) event, (DmlEvent) prevEvent); + return true; + } + } + } + else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) { + // Previous event is a SEL_LOB_LOCATOR + // SQL contains both non-inline LOB and inline-LOB field changes + if (EventType.UPDATE == event.getEventType()) { + if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { + for (int i = 0; i < ((DmlEvent) event).getDmlEntry().getNewValues().length; ++i) { + Object value = ((DmlEvent) event).getDmlEntry().getNewValues()[i]; + Object prevValue = ((DmlEvent) prevEvent).getDmlEntry().getNewValues()[i]; + if (prevValue == null && value != null) { + LOGGER.trace("\tAdding column index {} to previous SEL_LOB_LOCATOR event", i); + ((DmlEvent) prevEvent).getDmlEntry().getNewValues()[i] = value; + } + } + return true; + } + } + } + } + + LOGGER.trace("\tEvent {} is for a different row, merge skipped.", event.getEventType()); + return false; + } + + private void mergeLobWriteData(LogMinerEvent event) { + final Object data; + if (this.lastSelectLobLocatorEvent.isBinary()) { + // For BLOB we pass the list of chunks as-is to the value converter + data = new BlobChunkList(lobWriteData); + } + else { + // For CLOB we go ahead and pre-process the list into a single string + data = String.join("", lobWriteData); + } + + final DmlEvent dmlEvent = (DmlEvent) event; + final String columnName = lastSelectLobLocatorEvent.getColumnName(); + final int columnIndex = getSelectLobLocatorColumnIndex(); + + LOGGER.trace("\tSet LOB data for column '{}' on table {} in event {}", columnName, event.getTableId(), event.getEventType()); + dmlEvent.getDmlEntry().getNewValues()[columnIndex] = data; + lobWriteData.clear(); + } + + private int getSelectLobLocatorColumnIndex() { + final String columnName = lastSelectLobLocatorEvent.getColumnName(); + return LogMinerHelper.getColumnIndexByName(columnName, schema.tableFor(lastSelectLobLocatorEvent.getTableId())); + } + + private boolean isForSameTableOrScn(LogMinerEvent event, LogMinerEvent prevEvent) { + if (prevEvent != null) { + if (event.getTableId().equals(prevEvent.getTableId())) { + return true; + } + return event.getScn().equals(prevEvent.getScn()) && event.getRsId().equals(prevEvent.getRsId()); + } + return false; + } + + private boolean isSameTableRow(LogMinerEvent event, LogMinerEvent prevEvent) { + final Table table = schema.tableFor(event.getTableId()); + if (table == null) { + LOGGER.trace("Unable to locate table '{}' schema, unable to detect if same row.", event.getTableId()); + return false; + } + for (String columnName : table.primaryKeyColumnNames()) { + int position = LogMinerHelper.getColumnIndexByName(columnName, table); + Object prevValue = ((DmlEvent) prevEvent).getDmlEntry().getNewValues()[position]; + if (prevValue == null) { + throw new DebeziumException("Could not find column " + columnName + " in previous event"); + } + Object value = ((DmlEvent) event).getDmlEntry().getNewValues()[position]; + if (value == null) { + throw new DebeziumException("Could not find column " + columnName + " in event"); + } + if (!Objects.equals(value, prevValue)) { + return false; + } + } + return true; + } + + private void mergeNewColumns(DmlEvent event, DmlEvent prevEvent) { + final boolean prevEventIsInsert = EventType.INSERT == prevEvent.getEventType(); + for (int i = 0; i < event.getDmlEntry().getNewValues().length; ++i) { + Object value = event.getDmlEntry().getNewValues()[i]; + Object prevValue = prevEvent.getDmlEntry().getNewValues()[i]; + if (prevEventIsInsert && "EMPTY_CLOB()".equals(prevValue)) { + LOGGER.trace("\tAssigning column index {} with updated CLOB value.", i); + prevEvent.getDmlEntry().getNewValues()[i] = value; + } + else if (prevEventIsInsert && "EMPTY_BLOB()".equals(prevValue)) { + LOGGER.trace("\tAssigning column index {} with updated BLOB value.", i); + prevEvent.getDmlEntry().getNewValues()[i] = value; + } + else if (!prevEventIsInsert && value != null) { + LOGGER.trace("\tUpdating column index {} in previous event", i); + prevEvent.getDmlEntry().getNewValues()[i] = value; + } + } + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionReconciliation.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionReconciliation.java deleted file mode 100644 index 951fb2b18..000000000 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/TransactionReconciliation.java +++ /dev/null @@ -1,419 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.oracle.logminer.processor; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.debezium.DebeziumException; -import io.debezium.connector.oracle.BlobChunkList; -import io.debezium.connector.oracle.OracleConnectorConfig; -import io.debezium.connector.oracle.OracleDatabaseSchema; -import io.debezium.connector.oracle.logminer.LogMinerHelper; -import io.debezium.connector.oracle.logminer.events.DmlEvent; -import io.debezium.connector.oracle.logminer.events.EventType; -import io.debezium.connector.oracle.logminer.events.LobEraseEvent; -import io.debezium.connector.oracle.logminer.events.LobWriteEvent; -import io.debezium.connector.oracle.logminer.events.LogMinerEvent; -import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent; -import io.debezium.connector.oracle.logminer.events.Transaction; -import io.debezium.relational.Table; - -/** - * Helper class that performs common transaction reconciliation. - * - * Transactions read from Oracle LogMiner are subject to containing events that need to be merged - * together to reflect a single logical SQL operation, such as events that pertain to LOB fields. - * This class facilities all the steps needed to merge events and reconcile a transaction. - * - * @author Chris Cranford - */ -public class TransactionReconciliation { - - private static final Logger LOGGER = LoggerFactory.getLogger(TransactionReconciliation.class); - - private final OracleConnectorConfig connectorConfig; - private final OracleDatabaseSchema schema; - - public TransactionReconciliation(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) { - this.connectorConfig = connectorConfig; - this.schema = schema; - } - - /** - * Reconcile the specified transaction by merging multiple events that should be emitted as a single - * logical event, such as changes made to LOB column types that involve multiple events. - * - * @param transaction transaction to be reconciled, never {@code null} - */ - public void reconcile(Transaction transaction) { - // Do not perform reconciliation if LOB support is not enabled. - if (!connectorConfig.isLobEnabled()) { - return; - } - - final String transactionId = transaction.getTransactionId(); - LOGGER.trace("Reconciling transaction {}", transactionId); - - DmlEvent prevEvent = null; - int prevEventSize = transaction.getEvents().size(); - for (int i = 0; i < transaction.getEvents().size();) { - - final LogMinerEvent event = transaction.getEvents().get(i); - LOGGER.trace("Processing event {}", event); - - switch (event.getEventType()) { - case SELECT_LOB_LOCATOR: - if (shouldMergeSelectLobLocatorEvent(transaction, i, (SelectLobLocatorEvent) event, prevEvent)) { - continue; - } - break; - case INSERT: - case UPDATE: - if (shouldMergeDmlEvent(transaction, i, (DmlEvent) event, prevEvent)) { - continue; - } - break; - } - - ++i; - prevEvent = (DmlEvent) event; - LOGGER.trace("Previous event is now {}", prevEvent); - } - - int eventSize = transaction.getEvents().size(); - if (eventSize != prevEventSize) { - LOGGER.trace("Reconciled transaction {} from {} events to {}.", transactionId, prevEventSize, eventSize); - } - else { - LOGGER.trace("Transaction {} event queue was unmodified.", transactionId); - } - } - - /** - * Attempts to merge the provided SEL_LOB_LOCATOR event with the previous event in the transaction. - * - * @param transaction transaction being processed, never {@code null} - * @param index event index being processed - * @param event event being processed, never {@code null} - * @param prevEvent previous event in the transaction, can be {@code null} - * @return true if the event is merged, false if the event was not merged. - */ - protected boolean shouldMergeSelectLobLocatorEvent(Transaction transaction, int index, SelectLobLocatorEvent event, DmlEvent prevEvent) { - LOGGER.trace("\tDetected SelectLobLocatorEvent for column '{}'", event.getColumnName()); - - final int columnIndex = LogMinerHelper.getColumnIndexByName(event.getColumnName(), schema.tableFor(event.getTableId())); - - // Read and combine all LOB_WRITE events that follow SEL_LOB_LOCATOR - Object lobData = null; - final List lobWrites = readAndCombineLobWriteEvents(transaction, index, event.isBinary()); - if (!lobWrites.isEmpty()) { - if (event.isBinary()) { - // For BLOB we pass the list of string chunks as-is to the value converter - lobData = new BlobChunkList(lobWrites); - } - else { - // For CLOB we go ahead and pre-process the List into a single string. - lobData = String.join("", lobWrites); - } - } - - // Read and consume all LOB_ERASE events that follow SEL_LOB_LOCATOR - final int lobEraseEvents = readAndConsumeLobEraseEvents(transaction, index); - if (lobEraseEvents > 0) { - LOGGER.warn("LOB_ERASE for table '{}' column '{}' is not supported, use DML operations to manipulate LOB columns only.", event.getTableId(), - event.getColumnName()); - if (lobWrites.isEmpty()) { - // There are no write and only erase events, discard entire SEL_LOB_LOCATOR - // To simulate this, we treat this as a "merge" op so caller doesn't modify previous event - transaction.getEvents().remove(index); - return true; - } - } - else if (lobEraseEvents == 0 && lobWrites.isEmpty()) { - // There were no LOB operations present, discard entire SEL_LOB_LOCATOR - // To simulate this, we treat this as a "merge" op so caller doesn't modify previous event - transaction.getEvents().remove(index); - return true; - } - - // SelectLobLocatorEvent can be treated as a parent DML operation where an update occurs on any - // LOB-based column. In this case, the event will be treated as an UPDATE event when emitted. - - if (prevEvent == null) { - // There is no prior event, add column to this SelectLobLocatorEvent and don't merge. - LOGGER.trace("\tAdding column '{}' to current event", event.getColumnName()); - event.getDmlEntry().getNewValues()[columnIndex] = lobData; - return false; - } - - if (EventType.INSERT == prevEvent.getEventType()) { - // Previous event is an INSERT operation. - // Only merge the SEL_LOB_LOCATOR event if the previous INSERT is for the same table/row - // and if the INSERT's column value is EMPTY_CLOB() or EMPTY_BLOB() - if (isForSameTableOrScn(event, prevEvent)) { - LOGGER.trace("\tMerging SEL_LOB_LOCATOR with previous INSERT event"); - Object prevValue = prevEvent.getDmlEntry().getNewValues()[columnIndex]; - if (!"EMPTY_CLOB()".equals(prevValue) && !"EMPTY_BLOB()".equals(prevValue)) { - throw new DebeziumException("Expected to find column '" + event.getColumnName() + "' in table '" - + prevEvent.getTableId() + "' to be initialized as an empty LOB value.'"); - } - - prevEvent.getDmlEntry().getNewValues()[columnIndex] = lobData; - - // Remove the SEL_LOB_LOCATOR event from event list and indicate merged. - transaction.getEvents().remove(index); - return true; - } - } - else if (EventType.UPDATE == prevEvent.getEventType()) { - // Previous event is an UPDATE operation. - // Only merge the SEL_LOB_LOCATOR event if the previous UPDATE is for the same table/row - if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { - LOGGER.trace("\tUpdating SEL_LOB_LOCATOR column '{}' to previous UPDATE event", event.getColumnName()); - prevEvent.getDmlEntry().getNewValues()[columnIndex] = lobData; - - // Remove the SEL_LOB_LOCATOR event from event list and indicate merged. - transaction.getEvents().remove(index); - return true; - } - } - else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) { - // Previous event is a SEL_LOB_LOCATOR operation. - // Only merge the two SEL_LOB_LOCATOR events if they're for the same table/row - if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { - LOGGER.trace("\tAdding column '{}' to previous SEL_LOB_LOCATOR event", event.getColumnName()); - prevEvent.getDmlEntry().getNewValues()[columnIndex] = lobData; - - // Remove the SEL_LOB_LOCATOR event from event list and indicate merged. - transaction.getEvents().remove(index); - return true; - } - } - else { - throw new DebeziumException("Unexpected previous event operation: " + prevEvent.getEventType()); - } - - LOGGER.trace("\tSEL_LOB_LOCATOR event is for different row, merge skipped."); - LOGGER.trace("\tAdding column '{}' to current event", event.getColumnName()); - event.getDmlEntry().getNewValues()[columnIndex] = lobData; - return false; - } - - /** - * Attempts to merge the provided DML event with the previous event in the transaction. - * - * @param transaction transaction being processed, never {@code null} - * @param index event index being processed - * @param event event being processed, never {@code null} - * @param prevEvent previous event in the transaction, can be {@code null} - * @return true if the event is merged, false if the event was not merged - */ - protected boolean shouldMergeDmlEvent(Transaction transaction, int index, DmlEvent event, DmlEvent prevEvent) { - LOGGER.trace("\tDetected DmlEvent {}", event.getEventType()); - - if (prevEvent == null) { - // There is no prior event, therefore there is no reason to perform any merge. - return false; - } - - if (EventType.INSERT == prevEvent.getEventType()) { - // Previous event is an INSERT operation. - // The only valid combination here would be if the current event is an UPDATE since an INSERT cannot - // be merged with a prior INSERT with how LogMiner materializes the rows. - if (EventType.UPDATE == event.getEventType()) { - if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { - LOGGER.trace("\tMerging UPDATE event with previous INSERT event"); - mergeNewColumns(event, prevEvent); - - // Remove the UPDATE event from event list and indicate merged. - transaction.getEvents().remove(index); - return true; - } - } - } - else if (EventType.UPDATE == prevEvent.getEventType()) { - // Previous event is an UPDATE operation. - // This will happen if there are non-CLOB and inline-CLOB fields updated in the same SQL. - // The inline-CLOB values should be merged with the previous UPDATE event. - if (EventType.UPDATE == event.getEventType()) { - if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { - LOGGER.trace("\tMerging UPDATE event with previous UPDATE event"); - mergeNewColumns(event, prevEvent); - - // Remove the UPDATE event from event list and indicate merged. - transaction.getEvents().remove(index); - return true; - } - } - } - else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) { - // Previous event is a SEL_LOB_LOCATOR operation. - // SQL contained both non-inline CLOB and inline-CLOB field changes. - if (EventType.UPDATE == event.getEventType()) { - if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) { - LOGGER.trace("\tMerging UPDATE event with previous SEL_LOB_LOCATOR event"); - for (int i = 0; i < event.getDmlEntry().getNewValues().length; ++i) { - Object value = event.getDmlEntry().getNewValues()[i]; - Object prevValue = prevEvent.getDmlEntry().getNewValues()[i]; - if (prevValue == null && value != null) { - LOGGER.trace("\tAdding column index {} to previous SEL_LOB_LOCATOR event", i); - prevEvent.getDmlEntry().getNewValues()[i] = value; - } - } - - // Remove the UPDATE event from event list and indicate merged. - transaction.getEvents().remove(index); - return true; - } - } - } - - LOGGER.trace("\tDmlEvent {} event is for different row, merge skipped.", event.getEventType()); - return false; - } - - /** - * Reads the transaction event queue and combines all LOB_WRITE events starting at the provided index. - * for a SEL_LOB_LOCATOR event which is for binary data (BLOB) data types. - * - * @param transaction transaction being processed, never {@code null} - * @param index index to the first LOB_WRITE operation - * @return list of string-based values for each LOB_WRITE operation - */ - protected List readAndCombineLobWriteEvents(Transaction transaction, int index, boolean binaryData) { - List chunks = new ArrayList<>(); - for (int i = index + 1; i < transaction.getEvents().size(); ++i) { - final LogMinerEvent event = transaction.getEvents().get(i); - if (!(event instanceof LobWriteEvent)) { - break; - } - - final LobWriteEvent writeEvent = (LobWriteEvent) event; - if (binaryData && !writeEvent.getData().startsWith("HEXTORAW('") && !writeEvent.getData().endsWith("')")) { - throw new DebeziumException("Unexpected BLOB data chunk: " + writeEvent.getData()); - } - - chunks.add(writeEvent.getData()); - } - - if (!chunks.isEmpty()) { - LOGGER.trace("\tCombined {} LobWriteEvent events", chunks.size()); - // Remove events from the transaction queue queue - for (int i = 0; i < chunks.size(); ++i) { - transaction.getEvents().remove(index + 1); - } - } - - return chunks; - } - - /** - * Read and remove all LobErase events detected in the transaction event queue. - * - * @param transaction transaction being processed, never {@code null} - * @param index index to the first LOB_ERASE operation - * @return number of LOB_ERASE events consumed and removed from the event queue - */ - protected int readAndConsumeLobEraseEvents(Transaction transaction, int index) { - int events = 0; - for (int i = index + 1; i < transaction.getEvents().size(); ++i) { - final LogMinerEvent event = transaction.getEvents().get(i); - if (!(event instanceof LobEraseEvent)) { - break; - } - events++; - } - - if (events > 0) { - LOGGER.trace("\tConsumed {} LobErase events", events); - for (int i = 0; i < events; ++i) { - transaction.getEvents().remove(index + 1); - } - } - - return events; - } - - /** - * Checks whether the two events are for the same table or participate in the same system change. - * - * @param event current event being processed, never {@code null} - * @param prevEvent previous/parent event that has been processed, may be {@code null} - * @return true if the two events are for the same table or system change number, false otherwise - */ - protected boolean isForSameTableOrScn(LogMinerEvent event, LogMinerEvent prevEvent) { - if (prevEvent != null) { - if (event.getTableId().equals(prevEvent.getTableId())) { - return true; - } - return event.getScn().equals(prevEvent.getScn()) && event.getRsId().equals(prevEvent.getRsId()); - } - return false; - } - - /** - * Checks whether the two events are for the same table row. - * - * @param event current event being processed, never {@code null} - * @param prevEvent previous/parent event that has been processed, never {@code null} - * @return true if the two events are for the same table row, false otherwise - */ - protected boolean isSameTableRow(DmlEvent event, DmlEvent prevEvent) { - final Table table = schema.tableFor(event.getTableId()); - if (table == null) { - LOGGER.trace("Unable to locate table '{}' schema, unable to detect if same row.", event.getTableId()); - return false; - } - for (String columnName : table.primaryKeyColumnNames()) { - int position = LogMinerHelper.getColumnIndexByName(columnName, table); - Object prevValue = prevEvent.getDmlEntry().getNewValues()[position]; - if (prevValue == null) { - throw new DebeziumException("Could not find column " + columnName + " in previous event"); - } - Object value = event.getDmlEntry().getNewValues()[position]; - if (value == null) { - throw new DebeziumException("Could not find column " + columnName + " in event"); - } - if (!Objects.equals(value, prevValue)) { - return false; - } - } - return true; - } - - /** - * Merge column values from {@code event} with {@code prevEvent}. - * - * @param event current event being processed, never {@code null} - * @param prevEvent previous/parent parent that has been processed, never {@code null} - */ - protected void mergeNewColumns(DmlEvent event, DmlEvent prevEvent) { - final boolean prevEventIsInsert = EventType.INSERT == prevEvent.getEventType(); - - for (int i = 0; i < event.getDmlEntry().getNewValues().length; ++i) { - Object value = event.getDmlEntry().getNewValues()[i]; - Object prevValue = prevEvent.getDmlEntry().getNewValues()[i]; - if (prevEventIsInsert && "EMPTY_CLOB()".equals(prevValue)) { - LOGGER.trace("\tAssigning column index {} with updated CLOB value.", i); - prevEvent.getDmlEntry().getNewValues()[i] = value; - } - else if (prevEventIsInsert && "EMPTY_BLOB()".equals(prevValue)) { - LOGGER.trace("\tAssigning column index {} with updated BLOB value.", i); - prevEvent.getDmlEntry().getNewValues()[i] = value; - } - else if (!prevEventIsInsert && value != null) { - LOGGER.trace("\tUpdating column index {} in previous event", i); - prevEvent.getDmlEntry().getNewValues()[i] = value; - } - } - } - -} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerEventProcessor.java index 291452707..5ac28acde 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerEventProcessor.java @@ -11,6 +11,7 @@ import java.time.Duration; import java.time.Instant; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.infinispan.Cache; import org.infinispan.configuration.cache.Configuration; @@ -32,9 +33,11 @@ import io.debezium.connector.oracle.logminer.events.DmlEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; -import io.debezium.connector.oracle.logminer.events.Transaction; import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.Transaction; import io.debezium.connector.oracle.logminer.processor.TransactionCache; +import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer; +import io.debezium.function.BlockingConsumer; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.relational.TableId; @@ -46,7 +49,7 @@ * * @author Chris Cranford */ -public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor { +public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanLogMinerEventProcessor.class); @@ -62,6 +65,13 @@ public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProce */ private final InfinispanTransactionCache transactionCache; + /** + * A cache storing each of the raw events read from the LogMiner contents view. + * This cache is keyed using the format of "-" where the sequence + * is obtained from the {@link InfinispanTransaction#getEventId(int)} method. + */ + private final Cache eventCache; + /** * A cache storing recently committed transactions key by the unique transaction id and * the event's system change number. This cache is used to filter events during re-mining @@ -113,9 +123,26 @@ public InfinispanLogMinerEventProcessor(ChangeEventSourceContext context, final EmbeddedCacheManager manager = new DefaultCacheManager(); this.transactionCache = new InfinispanTransactionCache(createCache(manager, connectorConfig, "transactions")); + this.eventCache = createCache(manager, connectorConfig, "events"); this.recentlyCommittedTransactionsCache = createCache(manager, connectorConfig, "committed-transactions"); this.rollbackTransactionsCache = createCache(manager, connectorConfig, "rollback-transactions"); this.schemaChangesCache = createCache(manager, connectorConfig, "schema-changes"); + + displayCacheStatistics(); + } + + private void displayCacheStatistics() { + LOGGER.info("Cache Statistics:"); + LOGGER.info("\tTransactions : {}", transactionCache.size()); + LOGGER.info("\tCommitted Trxs : {}", recentlyCommittedTransactionsCache.size()); + LOGGER.info("\tRollback Trxs : {}", rollbackTransactionsCache.size()); + LOGGER.info("\tSchema Changes : {}", schemaChangesCache.size()); + LOGGER.info("\tEvents : {}", eventCache.size()); + if (!eventCache.isEmpty()) { + for (String eventKey : eventCache.keySet()) { + LOGGER.debug("\t\tFound Key: {}", eventKey); + } + } } private Cache createCache(EmbeddedCacheManager manager, OracleConnectorConfig connectorConfig, String name) { @@ -137,14 +164,31 @@ private Cache createCache(EmbeddedCacheManager manager, OracleConne } @Override - protected TransactionCache getTransactionCache() { + protected TransactionCache getTransactionCache() { return transactionCache; } + @Override + protected InfinispanTransaction createTransaction(LogMinerEventRow row) { + return new InfinispanTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName()); + } + + @Override + protected void removeEventWithRowId(LogMinerEventRow row) { + for (String eventKey : eventCache.keySet().stream().filter(k -> k.startsWith(row.getTransactionId() + "-")).collect(Collectors.toList())) { + final LogMinerEvent event = eventCache.get(eventKey); + if (event != null && event.getRowId().equals(row.getRowId())) { + LOGGER.trace("Undo applied for event {}.", event); + eventCache.remove(eventKey); + } + } + } + @Override public void close() throws Exception { if (getConfig().isLogMiningBufferDropOnStop()) { // Since the buffers are to be dropped on stop, clear them before stopping provider. + eventCache.clear(); transactionCache.clear(); recentlyCommittedTransactionsCache.clear(); rollbackTransactionsCache.clear(); @@ -154,6 +198,7 @@ public void close() throws Exception { recentlyCommittedTransactionsCache.stop(); rollbackTransactionsCache.stop(); schemaChangesCache.stop(); + eventCache.stop(); transactionCache.close(); } @@ -240,20 +285,24 @@ protected void handleCommit(LogMinerEventRow row) throws InterruptedException { return; } - final Transaction transaction = transactionCache.remove(transactionId); + final InfinispanTransaction transaction = transactionCache.remove(transactionId); if (transaction == null) { LOGGER.trace("Transaction {} not found.", transactionId); return; } - boolean skipExcludedUserName = false; - if (transaction.getUserName() == null && !transaction.getEvents().isEmpty()) { + final boolean skipExcludedUserName; + if (transaction.getUserName() == null && transaction.getNumberOfEvents() > 0) { LOGGER.debug("Got transaction with null username {}", transaction); + skipExcludedUserName = false; } else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUserName())) { LOGGER.trace("Skipping transaction with excluded username {}", transaction); skipExcludedUserName = true; } + else { + skipExcludedUserName = false; + } final Scn smallestScn = transactionCache.getMinimumScn(); metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn); @@ -263,56 +312,77 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser if ((offsetCommitScn != null && offsetCommitScn.compareTo(commitScn) > 0) || lastCommittedScn.compareTo(commitScn) > 0) { LOGGER.debug("Transaction {} has already been processed. Commit SCN in offset is {} while commit SCN of transaction is {} and last seen committed SCN is {}.", transactionId, offsetCommitScn, commitScn, lastCommittedScn); + transactionCache.remove(transactionId); metrics.setActiveTransactions(transactionCache.size()); + removeEventsWithTransaction(transaction); return; } counters.commitCount++; Instant start = Instant.now(); - getReconciliation().reconcile(transaction); - int numEvents = transaction.getEvents().size(); + int numEvents = getTransactionEventCount(transaction); LOGGER.trace("Commit: (smallest SCN {}) {}", smallestScn, row); LOGGER.trace("Transaction {} has {} events", transactionId, numEvents); - for (LogMinerEvent event : transaction.getEvents()) { - if (!context.isRunning()) { - return; - } + BlockingConsumer delegate = new BlockingConsumer() { + private int numEvents = getTransactionEventCount(transaction); - // Update SCN in offset context only if processed SCN less than SCN of other transactions - if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) { - offsetContext.setScn(event.getScn()); - metrics.setOldestScn(event.getScn()); - } + @Override + public void accept(LogMinerEvent event) throws InterruptedException { + // Update SCN in offset context only if processed SCN less than SCN of other transactions + if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) { + offsetContext.setScn(event.getScn()); + metrics.setOldestScn(event.getScn()); + } - offsetContext.setTransactionId(transactionId); - offsetContext.setSourceTime(event.getChangeTime()); - offsetContext.setTableId(event.getTableId()); - if (--numEvents == 0) { - // reached the last event update the commit scn in the offsets - offsetContext.setCommitScn(commitScn); - } + offsetContext.setTransactionId(transactionId); + offsetContext.setSourceTime(event.getChangeTime()); + offsetContext.setTableId(event.getTableId()); + if (--numEvents == 0) { + // reached the last event update the commit scn in the offsets + offsetContext.setCommitScn(commitScn); + } - // after reconciliation all events should be DML - // todo: do we want to move dml entry up and just let it be null to avoid cast? - final DmlEvent dmlEvent = (DmlEvent) event; - if (!skipExcludedUserName) { - dispatcher.dispatchDataChangeEvent(event.getTableId(), - new LogMinerChangeRecordEmitter( - partition, - offsetContext, - dmlEvent.getEventType(), - dmlEvent.getDmlEntry().getOldValues(), - dmlEvent.getDmlEntry().getNewValues(), - getSchema().tableFor(event.getTableId()), - Clock.system())); + final DmlEvent dmlEvent = (DmlEvent) event; + if (!skipExcludedUserName) { + dispatcher.dispatchDataChangeEvent(event.getTableId(), + new LogMinerChangeRecordEmitter( + partition, + offsetContext, + dmlEvent.getEventType(), + dmlEvent.getDmlEntry().getOldValues(), + dmlEvent.getDmlEntry().getNewValues(), + getSchema().tableFor(event.getTableId()), + Clock.system())); + } + } + }; + + int eventCount = 0; + try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, getConfig(), getSchema())) { + for (int i = 0; i < transaction.getNumberOfEvents(); ++i) { + if (!context.isRunning()) { + return; + } + + final LogMinerEvent event = eventCache.get(transaction.getEventId(i)); + if (event == null) { + // If an event is undone, it gets removed from the cache at undo time. + // This means that the call to get could return a null event and we + // should silently ignore it. + continue; + } + + eventCount++; + LOGGER.trace("Dispatching event {} {}", transaction.getEventId(i), event.getEventType()); + commitConsumer.accept(event); } } lastCommittedScn = Scn.valueOf(commitScn.longValue()); - if (!transaction.getEvents().isEmpty() && !skipExcludedUserName) { + if (transaction.getNumberOfEvents() > 0 && !skipExcludedUserName) { dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext); } else { @@ -327,9 +397,12 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser // cache recently committed transactions by transaction id recentlyCommittedTransactionsCache.put(transactionId, commitScn.toString()); + // Clear the event queue for the transaction + removeEventsWithTransaction(transaction); + metrics.incrementCommittedTransactions(); metrics.setActiveTransactions(transactionCache.size()); - metrics.incrementCommittedDmlCount(transaction.getEvents().size()); + metrics.incrementCommittedDmlCount(eventCount); metrics.setCommittedScn(commitScn); metrics.setOffsetScn(offsetContext.getScn()); metrics.setLastCommitDuration(Duration.between(start, Instant.now())); @@ -337,8 +410,9 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser @Override protected void handleRollback(LogMinerEventRow row) { - final Transaction transaction = transactionCache.get(row.getTransactionId()); + final InfinispanTransaction transaction = transactionCache.get(row.getTransactionId()); if (transaction != null) { + removeEventsWithTransaction(transaction); transactionCache.remove(row.getTransactionId()); rollbackTransactionsCache.put(row.getTransactionId(), row.getScn().toString()); @@ -361,16 +435,16 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept @Override protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { if (isTransactionIdAllowed(transactionId)) { - Transaction transaction = getTransactionCache().get(transactionId); + InfinispanTransaction transaction = getTransactionCache().get(transactionId); if (transaction == null) { LOGGER.trace("Transaction {} is not in cache, creating.", transactionId); - transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName()); + transaction = createTransaction(row); } - int eventId = transaction.getNextEventId(); - if (transaction.getEvents().size() <= eventId) { + String eventKey = transaction.getEventId(transaction.getNextEventId()); + if (!eventCache.containsKey(eventKey)) { // Add new event at eventId offset - LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId); - transaction.getEvents().add(eventSupplier.get()); + LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey); + eventCache.put(eventKey, eventSupplier.get()); metrics.calculateLagMetrics(row.getChangeTime()); } // When using Infinispan, this extra put is required so that the state is properly synchronized @@ -379,6 +453,15 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp } } + @Override + protected int getTransactionEventCount(InfinispanTransaction transaction) { + // todo: implement indexed keys when ISPN supports them + return (int) eventCache.keySet() + .parallelStream() + .filter(k -> k.startsWith(transaction.getTransactionId() + "-")) + .count(); + } + private PreparedStatement createQueryStatement() throws SQLException { final String query = LogMinerQueryBuilder.build(getConfig(), getSchema()); return jdbcConnection.connection().prepareStatement(query, @@ -434,4 +517,11 @@ private Scn calculateNewStartScn(Scn endScn) throws InterruptedException { return endScn; } } + + private void removeEventsWithTransaction(InfinispanTransaction transaction) { + // Clear the event queue for the transaction + for (int i = 0; i < transaction.getNumberOfEvents(); ++i) { + eventCache.remove(transaction.getEventId(i)); + } + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanTransaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanTransaction.java new file mode 100644 index 000000000..08484ed3a --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanTransaction.java @@ -0,0 +1,62 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan; + +import java.time.Instant; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.processor.AbstractTransaction; +import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.VisibleForMarshalling; + +/** + * A concrete implementation of {@link AbstractTransaction} for the Infinispan processor. + * + * @author Chris Cranford + */ +public class InfinispanTransaction extends AbstractTransaction { + + private int numberOfEvents; + + public InfinispanTransaction(String transactionId, Scn startScn, Instant changeTime, String userName) { + super(transactionId, startScn, changeTime, userName); + started(); + } + + @VisibleForMarshalling + public InfinispanTransaction(String transactionId, Scn startScn, Instant changeTime, String userName, int numberOfEvents) { + this(transactionId, startScn, changeTime, userName); + this.numberOfEvents = numberOfEvents; + } + + @Override + public int getNumberOfEvents() { + return numberOfEvents; + } + + @Override + public int getNextEventId() { + return numberOfEvents++; + } + + @Override + public void started() { + numberOfEvents = 0; + } + + public String getEventId(int index) { + if (index < 0 || index >= numberOfEvents) { + throw new IndexOutOfBoundsException("Index " + index + "outside the transaction " + getTransactionId() + " event list bounds"); + } + return getTransactionId() + "-" + index; + } + + @Override + public String toString() { + return "InfinispanTransaction{" + + "numberOfEvents=" + numberOfEvents + + "} " + super.toString(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanTransactionCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanTransactionCache.java index 7a02d75cf..de8aeb151 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanTransactionCache.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanTransactionCache.java @@ -11,7 +11,6 @@ import org.infinispan.commons.util.CloseableIterator; import io.debezium.connector.oracle.Scn; -import io.debezium.connector.oracle.logminer.events.Transaction; import io.debezium.connector.oracle.logminer.processor.TransactionCache; /** @@ -19,26 +18,26 @@ * * @author Chris Cranford */ -public class InfinispanTransactionCache implements TransactionCache> { +public class InfinispanTransactionCache implements TransactionCache> { - private final Cache cache; + private final Cache cache; - public InfinispanTransactionCache(Cache cache) { + public InfinispanTransactionCache(Cache cache) { this.cache = cache; } @Override - public Transaction get(String transactionId) { + public InfinispanTransaction get(String transactionId) { return cache.get(transactionId); } @Override - public void put(String transactionId, Transaction transaction) { + public void put(String transactionId, InfinispanTransaction transaction) { cache.put(transactionId, transaction); } @Override - public Transaction remove(String transactionId) { + public InfinispanTransaction remove(String transactionId) { return cache.remove(transactionId); } @@ -58,14 +57,14 @@ public boolean isEmpty() { } @Override - public Iterator> iterator() { + public Iterator> iterator() { return cache.entrySet().iterator(); } @Override public Scn getMinimumScn() { Scn minimumScn = Scn.NULL; - try (CloseableIterator iterator = cache.values().iterator()) { + try (CloseableIterator iterator = cache.values().iterator()) { while (iterator.hasNext()) { final Scn transactionScn = iterator.next().getStartScn(); if (minimumScn.isNull()) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/DmlEventAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/DmlEventAdapter.java new file mode 100644 index 000000000..5d14fa109 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/DmlEventAdapter.java @@ -0,0 +1,67 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling; + +import java.time.Instant; + +import org.infinispan.protostream.annotations.ProtoAdapter; +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoField; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.events.DmlEvent; +import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl; +import io.debezium.relational.TableId; + +/** + * An Infinispan ProtoStream adapter to marshall {@link DmlEvent} instances. + * + * This class defines a factory for creating {@link DmlEvent} instances when hydrating + * records from the persisted datastore as well as field handlers to extract values + * to be marshalled to the protocol buffer stream. + * + * The underlying protocol buffer record consists of the following structure: + *
+ *     message DmlEvent {
+ *         // structure of the super type, LogMinerEventAdapter
+ *         required LogMinerDmlEntryImpl entry = 7;
+ *     }
+ * 
+ * + * @author Chris Cranford + */ +@ProtoAdapter(DmlEvent.class) +public class DmlEventAdapter extends LogMinerEventAdapter { + + /** + * A ProtoStream factory that creates {@link DmlEvent} instances. + * + * @param eventType the event type + * @param scn the system change number, must not be {@code null} + * @param tableId the fully-qualified table name + * @param rowId the Oracle row-id the change is associated with + * @param rsId the Oracle rollback segment identifier + * @param changeTime the time the change occurred + * @param entry the parsed SQL statement entry + * @return the constructed DmlEvent + */ + @ProtoFactory + public DmlEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, LogMinerDmlEntryImpl entry) { + return new DmlEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), entry); + } + + /** + * A ProtoStream handler to extract the {@code entry} field from the {@link DmlEvent}. + * + * @param event the event instance, must not be {@code null} + * @return the LogMinerDmlEntryImpl instance + */ + @ProtoField(number = 7, required = true) + public LogMinerDmlEntryImpl getEntry(DmlEvent event) { + return (LogMinerDmlEntryImpl) event.getDmlEntry(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LobEraseEventAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LobEraseEventAdapter.java new file mode 100644 index 000000000..027f2f0be --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LobEraseEventAdapter.java @@ -0,0 +1,46 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling; + +import java.time.Instant; + +import org.infinispan.protostream.annotations.ProtoAdapter; +import org.infinispan.protostream.annotations.ProtoFactory; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.logminer.events.LobEraseEvent; +import io.debezium.relational.TableId; + +/** + * An Infinispan ProtoStream adapter to marshall {@link LobEraseEvent} instances. + * + * This class defines a factory for creating {@link LobEraseEvent} instances when hydrating + * records from the persisted datastore asa well as field handlers to extract values + * to be marshalled to the protocol buffer stream. + * + * @see LogMinerEventAdapter for the structure format of this adapter. + * @author Chris Cranford + */ +@ProtoAdapter(LobEraseEvent.class) +public class LobEraseEventAdapter extends LogMinerEventAdapter { + + /** + * A ProtoStream factory that creates {@link LobEraseEvent} instances. + * + * @param eventType the event type + * @param scn the system change number, must not be {@code null} + * @param tableId the fully-qualified table name + * @param rowId the Oracle row-id the change is associated with + * @param rsId the Oracle rollback segment identifier + * @param changeTime the time the change occurred + * @return the constructed LobEraseEvent + */ + @ProtoFactory + public LobEraseEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime) { + return new LobEraseEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime)); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LobWriteEventAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LobWriteEventAdapter.java new file mode 100644 index 000000000..4e03507fa --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LobWriteEventAdapter.java @@ -0,0 +1,66 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling; + +import java.time.Instant; + +import org.infinispan.protostream.annotations.ProtoAdapter; +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoField; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.logminer.events.LobWriteEvent; +import io.debezium.relational.TableId; + +/** + * An Infinispan ProtoStream adapter to marshall {@link LobWriteEvent} instances. + * + * This class defines a factory for creating {@link LobWriteEvent} instances when hydrating + * records from the persisted datastore asa well as field handlers to extract values + * to be marshalled to the protocol buffer stream. + * + * The underlying protocol buffer record consists of the following structure: + *
+ *     message LobWriteEvent {
+ *         // structure of the super type, LogMinerEventAdapter
+ *         string data = 7;
+ *     }
+ * 
+ * + * @author Chris Cranford + */ +@ProtoAdapter(LobWriteEvent.class) +public class LobWriteEventAdapter extends LogMinerEventAdapter { + + /** + * A ProtoStream factory that creates {@link LobWriteEvent} instances. + * + * @param eventType the event type + * @param scn the system change number, must not be {@code null} + * @param tableId the fully-qualified table name + * @param rowId the Oracle row-id the change is associated with + * @param rsId the Oracle rollback segment identifier + * @param changeTime the time the change occurred + * @param data the LOB data + * @return the constructed DmlEvent + */ + @ProtoFactory + public LobWriteEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, String data) { + return new LobWriteEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), data); + } + + /** + * A ProtoStream handler to extract the {@code data} field from a {@link LobWriteEvent} type. + * + * @param event the event instance, must not be {@code null} + * @return the data to be written for a LOB field, may be {@code null} + */ + @ProtoField(number = 7) + public String getData(LobWriteEvent event) { + return event.getData(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventAdapter.java index b86160cdb..3ff262723 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventAdapter.java @@ -11,47 +11,37 @@ import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; -import io.debezium.DebeziumException; import io.debezium.connector.oracle.Scn; -import io.debezium.connector.oracle.logminer.events.DmlEvent; import io.debezium.connector.oracle.logminer.events.EventType; -import io.debezium.connector.oracle.logminer.events.LobEraseEvent; -import io.debezium.connector.oracle.logminer.events.LobWriteEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; -import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent; -import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl; import io.debezium.relational.TableId; -import io.debezium.util.Strings; /** - * An Infinispan ProtoStream adapter to marshall {@link LogMinerEvent} instances and its subclasses. + * An Infinispan ProtoStream adapter to marshall {@link LogMinerEvent} instances. * - * This class defines a factory for creating {@link LogMinerEvent} instances and its subclasses for when - * hydrating records from the persisted datastore as well as field handlers to extract instance values + * This class defines a factory for creating {@link LogMinerEvent} instances when hydrating + * records from the persisted event datastore as well as field handlers to extract values * to be marshalled to the protocol buffer stream. * * The underlying protocol buffer record consists of the following structure: *
  *     message LogMinerEvent {
- *          required int32 eventType = 1;
- *          required string scn = 2;
- *          required string tableId = 3;
- *          required string rowId = 4;
- *          required object rsId = 5;
- *          required string changeTime = 6;
- *          LogMinerDmlEntryImpl entry = 7;
- *          string columnName = 8;
- *          boolean binary = 9;
- *          string data = 10;
+ *         required int32 eventType = 1;
+ *         required string scn = 2;
+ *         required string tableId = 3;
+ *         required string rowId = 4;
+ *         required object rsId = 5;
+ *         required string changeTime = 6;
  *     }
  * 
+ * * @author Chris Cranford */ @ProtoAdapter(LogMinerEvent.class) public class LogMinerEventAdapter { /** - * A ProtoStream factory that creates {@link LogMinerEvent} instances or one of its subclasses from field values. + * A ProtoStream factory that creates {@link LogMinerEvent} instances. * * @param eventType the event type * @param scn the system change number, must not be {@code null} @@ -59,50 +49,11 @@ public class LogMinerEventAdapter { * @param rowId the Oracle row-id the change is associated with * @param rsId the Oracle rollback segment identifier * @param changeTime the time the change occurred - * @param entry the parsed SQL statement entry - * @param columnName the column name for a {@code SEL_LOB_LOCATOR} event type - * @param binary whether the data is binary for a {@code SEL_LOB_LOCATOR} event type - * @param data the data to be written by a {@code LOB_WRITE} event type - * @return the constructed LogMinerEvent or one of its subclasses + * @return the constructed DmlEvent */ @ProtoFactory - public LogMinerEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, - LogMinerDmlEntryImpl entry, String columnName, Boolean binary, String data) { - final EventType type = EventType.from(eventType); - final Scn eventScn = Scn.valueOf(scn); - - final TableId id; - if (Strings.isNullOrEmpty(tableId)) { - id = null; - } - else { - id = TableId.parse(tableId); - } - - final Instant time; - if (Strings.isNullOrEmpty(changeTime)) { - time = null; - } - else { - time = Instant.parse(changeTime); - } - - switch (type) { - case INSERT: - case UPDATE: - case DELETE: - return new DmlEvent(type, eventScn, id, rowId, rsId, time, entry); - case SELECT_LOB_LOCATOR: - return new SelectLobLocatorEvent(type, eventScn, id, rowId, rsId, time, entry, columnName, binary); - case LOB_WRITE: - return new LobWriteEvent(type, eventScn, id, rowId, rsId, time, data); - case LOB_ERASE: - return new LobEraseEvent(type, eventScn, id, rowId, rsId, time); - case LOB_TRIM: - return new LogMinerEvent(type, eventScn, id, rowId, rsId, time); - default: - throw new DebeziumException("Unknown event type: " + eventType); - } + public LogMinerEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime) { + return new LogMinerEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime)); } /** @@ -138,8 +89,6 @@ public String getScn(LogMinerEvent event) { */ @ProtoField(number = 3, required = true) public String getTableId(LogMinerEvent event) { - // We intentionally serialize the TableId as a string since string values are natively supported - // by protocol buffers and we don't need to write a special marshaller for the TableId class. return event.getTableId().identifier(); } @@ -173,69 +122,6 @@ public String getRsId(LogMinerEvent event) { */ @ProtoField(number = 6, required = true) public String getChangeTime(LogMinerEvent event) { - // Serialized using Instant's normal toString() format since strings as handled natively by - // protocol buffers and will be parsed using Instant#parse in the factory method. return event.getChangeTime().toString(); } - - /** - * A ProtoStream handler to extract the {@code entry} field from the {@link DmlEvent} and subclass types. - * - * @param event the event instance, must not be {@code null} - * @return the LogMinerDmlEntryImpl instance or {@code null} if the event type isn't related to DML events - */ - @ProtoField(number = 7) - public LogMinerDmlEntryImpl getEntry(LogMinerEvent event) { - switch (event.getEventType()) { - case INSERT: - case UPDATE: - case DELETE: - case SELECT_LOB_LOCATOR: - return (LogMinerDmlEntryImpl) ((DmlEvent) event).getDmlEntry(); - default: - return null; - } - } - - /** - * A ProtoStream handler to extract the {@code columnName} field from a {@link SelectLobLocatorEvent} type. - * - * @param event the event instance, must not be {@code null} - * @return the column name or {@code null} if the event is not a SelectLobLocatorEvent type - */ - @ProtoField(number = 8) - public String getColumnName(LogMinerEvent event) { - if (EventType.SELECT_LOB_LOCATOR.equals(event.getEventType())) { - return ((SelectLobLocatorEvent) event).getColumnName(); - } - return null; - } - - /** - * A ProtoStream handler to extract the {@code binary} field from a {@link SelectLobLocatorEvent} type. - * - * @param event the event instance, must not be {@code null} - * @return the binary data flag or {@code null} if the event is not a SelectLobLocatorEvent type - */ - @ProtoField(number = 9) - public Boolean getBinary(LogMinerEvent event) { - if (EventType.SELECT_LOB_LOCATOR.equals(event.getEventType())) { - return ((SelectLobLocatorEvent) event).isBinary(); - } - return null; - } - - /** - * A ProtoStream handler to extract the {@code data} field from a {@link LobWriteEvent} type. - * - * @param event the event instance, must not be {@code null} - * @return the data to be written for a LOB field or {@code null} if the event is not a LobWriteEvent type - */ - @ProtoField(number = 10) - public String getData(LogMinerEvent event) { - if (EventType.LOB_WRITE.equals(event.getEventType())) { - return ((LobWriteEvent) event).getData(); - } - return null; - } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventMarshaller.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventMarshaller.java new file mode 100644 index 000000000..f00ffb2eb --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/LogMinerEventMarshaller.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling; + +import org.infinispan.protostream.SerializationContextInitializer; +import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder; + +/** + * An interface that is used by the ProtoStream framework to designate the adapters and path + * to where the a Protocol Buffers .proto file will be generated based on the adapters + * at compile time. + * + * @author Chris Cranford + */ +@AutoProtoSchemaBuilder(includeClasses = { LogMinerEventAdapter.class, DmlEventAdapter.class, SelectLobLocatorEventAdapter.class, LobWriteEventAdapter.class, + LobEraseEventAdapter.class, LogMinerDmlEntryImplAdapter.class }, schemaFilePath = "/") +public interface LogMinerEventMarshaller extends SerializationContextInitializer { +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/SelectLobLocatorEventAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/SelectLobLocatorEventAdapter.java new file mode 100644 index 000000000..483154ce4 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/SelectLobLocatorEventAdapter.java @@ -0,0 +1,83 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling; + +import java.time.Instant; + +import org.infinispan.protostream.annotations.ProtoAdapter; +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoField; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent; +import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl; +import io.debezium.relational.TableId; + +/** + * An Infinispan ProtoStream adapter to marshall {@link SelectLobLocatorEvent} instances. + * + * This class defines a factory for creating {@link SelectLobLocatorEvent} instances when hydrating + * records from the persisted datastore asa well as field handlers to extract values + * to be marshalled to the protocol buffer stream. + * + * The underlying protocol buffer record consists of the following structure: + *
+ *     message SelectLobLocatorEvent {
+ *         // structure of the super type, DmlEventAdapter
+ *         required string columnName = 8;
+ *         required boolean binary = 9;
+ *     }
+ * 
+ * + * @author Chris Cranford + */ +@ProtoAdapter(SelectLobLocatorEvent.class) +public class SelectLobLocatorEventAdapter extends DmlEventAdapter { + + /** + * A ProtoStream factory that creates {@link SelectLobLocatorEvent} instances. + * + * @param eventType the event type + * @param scn the system change number, must not be {@code null} + * @param tableId the fully-qualified table name + * @param rowId the Oracle row-id the change is associated with + * @param rsId the Oracle rollback segment identifier + * @param changeTime the time the change occurred + * @param entry the parsed SQL statement entry + * @param columnName the column name references by the SelectLobLocatorEvent + * @param binary whether the data is binary- or character- based + * @return the constructed SelectLobLocatorEvent + */ + @ProtoFactory + public SelectLobLocatorEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, LogMinerDmlEntryImpl entry, + String columnName, Boolean binary) { + return new SelectLobLocatorEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), entry, columnName, + binary); + } + + /** + * A ProtoStream handler to extract the {@code columnName} field from a {@link SelectLobLocatorEvent} type. + * + * @param event the event instance, must not be {@code null} + * @return the column name + */ + @ProtoField(number = 8, required = true) + public String getColumnName(SelectLobLocatorEvent event) { + return event.getColumnName(); + } + + /** + * A ProtoStream handler to extract the {@code binary} field from a {@link SelectLobLocatorEvent} type. + * + * @param event the event instance, must not be {@code null} + * @return the binary data flag + */ + @ProtoField(number = 9, required = true) + public Boolean getBinary(SelectLobLocatorEvent event) { + return event.isBinary(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionAdapter.java index 727cae596..829702485 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionAdapter.java @@ -6,63 +6,60 @@ package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling; import java.time.Instant; -import java.util.List; import org.infinispan.protostream.annotations.ProtoAdapter; import org.infinispan.protostream.annotations.ProtoFactory; import org.infinispan.protostream.annotations.ProtoField; import io.debezium.connector.oracle.Scn; -import io.debezium.connector.oracle.logminer.events.LogMinerEvent; -import io.debezium.connector.oracle.logminer.events.Transaction; +import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanTransaction; /** - * An Infinispan ProtoStream adapter for marshalling a {@link Transaction} instance. + * An Infinispan ProtoStream adapter for marshalling a {@link InfinispanTransaction} instance. * - * This class defines a factory for creating {@link Transaction} instances when hydrating a transaction + * This class defines a factory for creating {@link InfinispanTransaction} instances when hydrating a transaction * record from the protocol buffer datastore as well as field handlers to extract values from a given * transaction instance for serializing the instance to a protocol buffer stream. * * @author Chris Cranford */ -@ProtoAdapter(Transaction.class) +@ProtoAdapter(InfinispanTransaction.class) public class TransactionAdapter { /** - * A ProtoStream factory that creates a {@link Transaction} instance from field values. + * A ProtoStream factory that creates a {@link InfinispanTransaction} instance from field values. * * @param transactionId the transaction identifier * @param scn the starting system change number of the transaction * @param changeTime the starting time of the transaction - * @param events list of events that are part of the transaction * @param userName the user name * @param numberOfEvents the number of events in the transaction * @return the constructed Transaction instance */ @ProtoFactory - public Transaction factory(String transactionId, String scn, String changeTime, List events, String userName, int numberOfEvents) { - return new Transaction(transactionId, Scn.valueOf(scn), Instant.parse(changeTime), events, userName, numberOfEvents); + public InfinispanTransaction factory(String transactionId, String scn, String changeTime, String userName, int numberOfEvents) { + return new InfinispanTransaction(transactionId, Scn.valueOf(scn), Instant.parse(changeTime), userName, numberOfEvents); } /** - * A ProtoStream handler to extract the {@code transactionId} field from the {@link Transaction}. + * A ProtoStream handler to extract the {@code transactionId} field from the {@link InfinispanTransaction}. * * @param transaction the transaction instance, must not be {@code null} * @return the transaction identifier, never {@code null} */ @ProtoField(number = 1, required = true) - public String getTransactionId(Transaction transaction) { + public String getTransactionId(InfinispanTransaction transaction) { return transaction.getTransactionId(); } /** - * A ProtoStream handler to extract the {@code startScn} field from the {@link Transaction}. + * A ProtoStream handler to extract the {@code startScn} field from the {@link InfinispanTransaction}. * * @param transaction the transaction instance, must not be {@code null} * @return the starting system change number, never {@code null} */ @ProtoField(number = 2, required = true) - public String getScn(Transaction transaction) { + public String getScn(InfinispanTransaction transaction) { // We intentionally serialize the Scn class as a string to the protocol buffer datastore // and so the factory method also accepts a string parameter and converts the value to a // Scn instance during instantiation. This avoids the need for an additional adapter. @@ -70,40 +67,29 @@ public String getScn(Transaction transaction) { } /** - * A ProtoStream handler to extract the {@code changeTime} field from the {@link Transaction}. + * A ProtoStream handler to extract the {@code changeTime} field from the {@link InfinispanTransaction}. * * @param transaction the transaction instance, must not be {@code null} * @return the starting time of the transaction, never {@code null} */ @ProtoField(number = 3, required = true) - public String getChangeTime(Transaction transaction) { + public String getChangeTime(InfinispanTransaction transaction) { return transaction.getChangeTime().toString(); } - /** - * A ProtoStream handler to extract the {@code events} field from the {@link Transaction}. - * - * @param transaction the transaction instance, must not be {@code null} - * @return list of events within the transaction - */ @ProtoField(number = 4) - public List getEvents(Transaction transaction) { - return transaction.getEvents(); - } - - @ProtoField(number = 5) - public String getUserName(Transaction transaction) { + public String getUserName(InfinispanTransaction transaction) { return transaction.getUserName(); } /** - * A ProtoStream handler to extract the {@code eventIds} field from the {@link Transaction}. + * A ProtoStream handler to extract the {@code eventIds} field from the {@link InfinispanTransaction}. * * @param transaction the transaction instance, must not be {@code null} * @return the number of events in the transaction */ - @ProtoField(number = 6, defaultValue = "0") - public int getNumberOfEvents(Transaction transaction) { + @ProtoField(number = 5, defaultValue = "0") + public int getNumberOfEvents(InfinispanTransaction transaction) { return transaction.getNumberOfEvents(); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionMarshaller.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionMarshaller.java index 8ae292104..8ed61706d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionMarshaller.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/marshalling/TransactionMarshaller.java @@ -15,6 +15,6 @@ * * @author Chris Cranford */ -@AutoProtoSchemaBuilder(includeClasses = { TransactionAdapter.class, LogMinerEventAdapter.class, LogMinerDmlEntryImplAdapter.class }, schemaFilePath = "/") +@AutoProtoSchemaBuilder(includeClasses = { TransactionAdapter.class }, schemaFilePath = "/") public interface TransactionMarshaller extends SerializationContextInitializer { } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java index 6b4a302bd..3272baaf6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,10 +34,11 @@ import io.debezium.connector.oracle.logminer.events.DmlEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; -import io.debezium.connector.oracle.logminer.events.Transaction; import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.TransactionCache; +import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer; +import io.debezium.function.BlockingConsumer; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.relational.TableId; @@ -48,7 +50,7 @@ * * @author Chris Cranford */ -public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor { +public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLogMinerEventProcessor.class); @@ -88,10 +90,26 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context, } @Override - protected TransactionCache getTransactionCache() { + protected TransactionCache getTransactionCache() { return transactionCache; } + @Override + protected MemoryTransaction createTransaction(LogMinerEventRow row) { + return new MemoryTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName()); + } + + @Override + protected void removeEventWithRowId(LogMinerEventRow row) { + final MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId()); + if (transaction == null) { + LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row); + } + else { + transaction.removeEventWithRowId(row.getRowId()); + } + } + @Override public void close() throws Exception { // close any resources used here @@ -153,9 +171,9 @@ public void abandonTransactions(Duration retention) { thresholdScn = smallestScn; } - Iterator> iterator = transactionCache.iterator(); + Iterator> iterator = transactionCache.iterator(); while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); + Map.Entry entry = iterator.next(); if (entry.getValue().getStartScn().compareTo(thresholdScn) <= 0) { LOGGER.warn("Transaction {} is being abandoned.", entry.getKey()); abandonedTransactionsCache.add(entry.getKey()); @@ -210,20 +228,24 @@ protected void handleCommit(LogMinerEventRow row) throws InterruptedException { return; } - final Transaction transaction = transactionCache.remove(transactionId); + final MemoryTransaction transaction = transactionCache.remove(transactionId); if (transaction == null) { LOGGER.trace("Transaction {} not found.", transactionId); return; } - boolean skipExcludedUserName = false; + final boolean skipExcludedUserName; if (transaction.getUserName() == null && !transaction.getEvents().isEmpty()) { LOGGER.debug("Got transaction with null username {}", transaction); + skipExcludedUserName = false; } else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUserName())) { LOGGER.trace("Skipping transaction with excluded username {}", transaction); skipExcludedUserName = true; } + else { + skipExcludedUserName = false; + } final Scn smallestScn = transactionCache.getMinimumScn(); metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn); @@ -241,44 +263,56 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser counters.commitCount++; Instant start = Instant.now(); - getReconciliation().reconcile(transaction); int numEvents = transaction.getEvents().size(); LOGGER.trace("Commit: (smallest SCN {}) {}", smallestScn, row); LOGGER.trace("Transaction {} has {} events", transactionId, numEvents); - for (LogMinerEvent event : transaction.getEvents()) { - if (!context.isRunning()) { - return; - } + BlockingConsumer delegate = new BlockingConsumer() { + private int numEvents = getTransactionEventCount(transaction); - // Update SCN in offset context only if processed SCN less than SCN of other transactions - if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) { - offsetContext.setScn(event.getScn()); - metrics.setOldestScn(event.getScn()); - } + @Override + public void accept(LogMinerEvent event) throws InterruptedException { + // Update SCN in offset context only if processed SCN less than SCN of other transactions + if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) { + offsetContext.setScn(event.getScn()); + metrics.setOldestScn(event.getScn()); + } - offsetContext.setTransactionId(transactionId); - offsetContext.setSourceTime(event.getChangeTime()); - offsetContext.setTableId(event.getTableId()); - if (--numEvents == 0) { - // reached the last event update the commit scn in the offsets - offsetContext.setCommitScn(commitScn); - } + offsetContext.setTransactionId(transactionId); + offsetContext.setSourceTime(event.getChangeTime()); + offsetContext.setTableId(event.getTableId()); + if (--numEvents == 0) { + // reached the last event update the commit scn in the offsets + offsetContext.setCommitScn(commitScn); + } - // after reconciliation all events should be DML - final DmlEvent dmlEvent = (DmlEvent) event; - if (!skipExcludedUserName) { - dispatcher.dispatchDataChangeEvent(event.getTableId(), - new LogMinerChangeRecordEmitter( - partition, - offsetContext, - dmlEvent.getEventType(), - dmlEvent.getDmlEntry().getOldValues(), - dmlEvent.getDmlEntry().getNewValues(), - getSchema().tableFor(event.getTableId()), - Clock.system())); + final DmlEvent dmlEvent = (DmlEvent) event; + if (!skipExcludedUserName) { + dispatcher.dispatchDataChangeEvent(event.getTableId(), + new LogMinerChangeRecordEmitter( + partition, + offsetContext, + dmlEvent.getEventType(), + dmlEvent.getDmlEntry().getOldValues(), + dmlEvent.getDmlEntry().getNewValues(), + getSchema().tableFor(event.getTableId()), + Clock.system())); + } + } + }; + + int eventCount = 0; + try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, getConfig(), getSchema())) { + for (LogMinerEvent event : transaction.getEvents()) { + if (!context.isRunning()) { + return; + } + + eventCount++; + LOGGER.trace("Dispatching event {} {}", eventCount, event.getEventType()); + commitConsumer.accept(event); } } @@ -302,7 +336,7 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser metrics.incrementCommittedTransactions(); metrics.setActiveTransactions(transactionCache.size()); - metrics.incrementCommittedDmlCount(transaction.getEvents().size()); + metrics.incrementCommittedDmlCount(eventCount); metrics.setCommittedScn(commitScn); metrics.setOffsetScn(offsetContext.getScn()); metrics.setLastCommitDuration(Duration.between(start, Instant.now())); @@ -310,7 +344,7 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser @Override protected void handleRollback(LogMinerEventRow row) { - final Transaction transaction = transactionCache.get(row.getTransactionId()); + final MemoryTransaction transaction = transactionCache.get(row.getTransactionId()); if (transaction != null) { transactionCache.remove(row.getTransactionId()); abandonedTransactionsCache.remove(row.getTransactionId()); @@ -332,6 +366,33 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept } } + @Override + protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { + if (isTransactionIdAllowed(transactionId)) { + MemoryTransaction transaction = getTransactionCache().get(transactionId); + if (transaction == null) { + LOGGER.trace("Transaction {} not in cache for DML, creating.", transactionId); + transaction = createTransaction(row); + getTransactionCache().put(transactionId, transaction); + } + + int eventId = transaction.getNextEventId(); + if (transaction.getEvents().size() <= eventId) { + // Add new event at eventId offset + LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId); + transaction.getEvents().add(eventSupplier.get()); + metrics.calculateLagMetrics(row.getChangeTime()); + } + + metrics.setActiveTransactions(getTransactionCache().size()); + } + } + + @Override + protected int getTransactionEventCount(MemoryTransaction transaction) { + return transaction.getEvents().size(); + } + private PreparedStatement createQueryStatement() throws SQLException { final String query = LogMinerQueryBuilder.build(getConfig(), getSchema()); return jdbcConnection.connection().prepareStatement(query, @@ -402,4 +463,5 @@ protected Optional getLastScnToAbandon(OracleConnection connection, Scn off return Optional.of(offsetScn); } } + } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransaction.java new file mode 100644 index 000000000..bd19040a8 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransaction.java @@ -0,0 +1,72 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.memory; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.events.LogMinerEvent; +import io.debezium.connector.oracle.logminer.processor.AbstractTransaction; + +/** + * A concrete implementation of a {@link AbstractTransaction} for the JVM heap memory processor. + * + * @author Chris Cranford + */ +public class MemoryTransaction extends AbstractTransaction { + + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTransaction.class); + + private int numberOfEvents; + private List events; + + public MemoryTransaction(String transactionId, Scn startScn, Instant changeTime, String userName) { + super(transactionId, startScn, changeTime, userName); + this.events = new ArrayList<>(); + started(); + } + + @Override + public int getNumberOfEvents() { + return numberOfEvents; + } + + @Override + public int getNextEventId() { + return numberOfEvents++; + } + + @Override + public void started() { + numberOfEvents = 0; + } + + public List getEvents() { + return events; + } + + public void removeEventWithRowId(String rowId) { + events.removeIf(event -> { + if (event.getRowId().equals(rowId)) { + LOGGER.trace("Undo applied for event {}.", event); + return true; + } + return false; + }); + } + + @Override + public String toString() { + return "MemoryTransaction{" + + "numberOfEvents=" + numberOfEvents + + "} " + super.toString(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransactionCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransactionCache.java index 8e250900a..a49f1a94b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransactionCache.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransactionCache.java @@ -10,7 +10,6 @@ import java.util.Map; import io.debezium.connector.oracle.Scn; -import io.debezium.connector.oracle.logminer.events.Transaction; import io.debezium.connector.oracle.logminer.processor.TransactionCache; /** @@ -18,22 +17,22 @@ * * @author Chris Cranford */ -public class MemoryTransactionCache implements TransactionCache> { +public class MemoryTransactionCache implements TransactionCache> { - public final Map cache = new HashMap<>(); + public final Map cache = new HashMap<>(); @Override - public Transaction get(String transactionId) { + public MemoryTransaction get(String transactionId) { return cache.get(transactionId); } @Override - public void put(String transactionId, Transaction transaction) { + public void put(String transactionId, MemoryTransaction transaction) { cache.put(transactionId, transaction); } @Override - public Transaction remove(String transactionId) { + public MemoryTransaction remove(String transactionId) { return cache.remove(transactionId); } @@ -53,14 +52,14 @@ public boolean isEmpty() { } @Override - public Iterator> iterator() { + public Iterator> iterator() { return cache.entrySet().iterator(); } @Override public Scn getMinimumScn() { return cache.values().stream() - .map(Transaction::getStartScn) + .map(MemoryTransaction::getStartScn) .min(Scn::compareTo) .orElse(Scn.NULL); } diff --git a/debezium-connector-oracle/src/test/resources/log4j.properties b/debezium-connector-oracle/src/test/resources/log4j.properties index d21ec2a60..2a35e333a 100644 --- a/debezium-connector-oracle/src/test/resources/log4j.properties +++ b/debezium-connector-oracle/src/test/resources/log4j.properties @@ -13,6 +13,7 @@ log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN #log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG log4j.logger.io.debezium.core=DEBUG log4j.logger.io.debezium.connector.oracle=DEBUG +log4j.logger.io.debezium.connector.oracle.logminer.processor=TRACE # Avoid the fallback property spam log4j.logger.io.debezium.config.Configuration=ERROR