DBZ-4159 Cache LogMinerEvent instances in separate cache

This commit is contained in:
Chris Cranford 2021-10-29 06:46:49 -04:00 committed by Gunnar Morling
parent a4e4851b02
commit 954b81a75f
22 changed files with 1130 additions and 850 deletions

View File

@ -1,126 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.events;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.VisibleForMarshalling;
/**
* A logical database transaction
*
* @author Chris Cranford
*/
public class Transaction {
private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);
private static final String UNKNOWN = "UNKNOWN";
private final String transactionId;
private final Scn startScn;
private final Instant changeTime;
private final List<LogMinerEvent> events;
private final String userName;
private int numberOfEvents;
@VisibleForMarshalling
public Transaction(String transactionId, Scn startScn, Instant changeTime, List<LogMinerEvent> events, String userName, int numberOfEvents) {
this.transactionId = transactionId;
this.startScn = startScn;
this.changeTime = changeTime;
this.events = events;
this.userName = !UNKNOWN.equalsIgnoreCase(userName) ? userName : null;
this.numberOfEvents = numberOfEvents;
}
public Transaction(String transactionId, Scn startScn, Instant changeTime, String userName) {
this(transactionId, startScn, changeTime, new ArrayList<>(), userName, 0);
}
public String getTransactionId() {
return transactionId;
}
public Scn getStartScn() {
return startScn;
}
public Instant getChangeTime() {
return changeTime;
}
public List<LogMinerEvent> getEvents() {
return events;
}
public int getNumberOfEvents() {
return numberOfEvents;
}
public int getNextEventId() {
return numberOfEvents++;
}
/**
* Should be called when a transaction start is detected.
*/
public void started() {
numberOfEvents = 0;
}
/**
* Removes any all events within the transaction with the specified {code rowId}.
*
* @param rowId the row id for the SQL event that should be removed
*/
public void removeEventWithRowId(String rowId) {
events.removeIf(event -> {
if (event.getRowId().equals(rowId)) {
LOGGER.trace("Undo applied for event {}.", event);
return true;
}
return false;
});
}
public String getUserName() {
return userName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Transaction that = (Transaction) o;
return Objects.equals(transactionId, that.transactionId);
}
@Override
public int hashCode() {
return Objects.hash(transactionId);
}
@Override
public String toString() {
return "Transaction{" +
"transactionId='" + transactionId + '\'' +
", startScn=" + startScn +
", userName='" + userName +
", numberOfEvents=" + numberOfEvents +
"'}";
}
}

View File

@ -30,7 +30,6 @@
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
@ -45,7 +44,7 @@
*
* @author Chris Cranford
*/
public abstract class AbstractLogMinerEventProcessor implements LogMinerEventProcessor {
public abstract class AbstractLogMinerEventProcessor<T extends AbstractTransaction> implements LogMinerEventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class);
@ -56,7 +55,6 @@ public abstract class AbstractLogMinerEventProcessor implements LogMinerEventPro
private final OracleOffsetContext offsetContext;
private final EventDispatcher<TableId> dispatcher;
private final OracleStreamingChangeEventSourceMetrics metrics;
private final TransactionReconciliation reconciliation;
private final LogMinerDmlParser dmlParser;
private final SelectLobParser selectLobParser;
@ -78,7 +76,6 @@ public AbstractLogMinerEventProcessor(ChangeEventSourceContext context,
this.offsetContext = offsetContext;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.reconciliation = new TransactionReconciliation(connectorConfig, schema);
this.counters = new Counters();
this.dmlParser = new LogMinerDmlParser();
this.selectLobParser = new SelectLobParser();
@ -92,10 +89,6 @@ protected OracleDatabaseSchema getSchema() {
return schema;
}
protected TransactionReconciliation getReconciliation() {
return reconciliation;
}
/**
* Check whether a transaction has been recently committed.
* Any implementation that does not support recently-committed tracking should return false.
@ -141,7 +134,13 @@ protected Scn getLastProcessedScn() {
* Returns the {@code TransactionCache} implementation.
* @return the transaction cache, never {@code null}
*/
protected abstract TransactionCache<?> getTransactionCache();
protected abstract TransactionCache<T, ?> getTransactionCache();
protected abstract T createTransaction(LogMinerEventRow row);
protected abstract void removeEventWithRowId(LogMinerEventRow row);
protected abstract int getTransactionEventCount(T transaction);
// todo: can this be removed in favor of a single implementation?
protected boolean isTrxIdRawValue() {
@ -221,10 +220,9 @@ protected void handleMissingScn(LogMinerEventRow row) {
*/
protected void handleStart(LogMinerEventRow row) {
final String transactionId = row.getTransactionId();
final Transaction transaction = getTransactionCache().get(transactionId);
final AbstractTransaction transaction = getTransactionCache().get(transactionId);
if (transaction == null && !isRecentlyCommitted(transactionId)) {
Transaction newTransaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName());
getTransactionCache().put(transactionId, newTransaction);
getTransactionCache().put(transactionId, createTransaction(row));
metrics.setActiveTransactions(getTransactionCache().size());
}
else if (transaction != null && !isRecentlyCommitted(transactionId)) {
@ -402,13 +400,7 @@ protected void handleDataEvent(LogMinerEventRow row) throws SQLException, Interr
// with a rollback flag to indicate that the prior event should be omitted. In this
// use case, the transaction can still be committed, so we need to manually rollback
// the previous DML event when this use case occurs.
final Transaction transaction = getTransactionCache().get(row.getTransactionId());
if (transaction == null) {
LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row);
}
else {
transaction.removeEventWithRowId(row.getRowId());
}
removeEventWithRowId(row);
return;
}
@ -485,26 +477,7 @@ private boolean hasNextWithMetricsUpdate(ResultSet resultSet) throws SQLExceptio
* @param row the LogMiner event row
* @param eventSupplier the supplier of the event to create if the event is allowed to be added
*/
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (isTransactionIdAllowed(transactionId)) {
Transaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not in cache for DML, creating.", transactionId);
transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName());
getTransactionCache().put(transactionId, transaction);
}
int eventId = transaction.getNextEventId();
if (transaction.getEvents().size() <= eventId) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId);
transaction.getEvents().add(eventSupplier.get());
metrics.calculateLagMetrics(row.getChangeTime());
}
metrics.setActiveTransactions(getTransactionCache().size());
}
}
protected abstract void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier);
/**
* Dispatch a schema change event for a new table and get the newly created relational table model.

View File

@ -0,0 +1,80 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.time.Instant;
import java.util.Objects;
import io.debezium.connector.oracle.Scn;
/**
* An abstract implementation of an Oracle {@link Transaction}.
*
* @author Chris Cranford
*/
public abstract class AbstractTransaction implements Transaction {
private static final String UNKNOWN = "UNKNOWN";
private final String transactionId;
private final Scn startScn;
private final Instant changeTime;
private final String userName;
public AbstractTransaction(String transactionId, Scn startScn, Instant changeTime, String userName) {
this.transactionId = transactionId;
this.startScn = startScn;
this.changeTime = changeTime;
this.userName = !UNKNOWN.equalsIgnoreCase(userName) ? userName : null;
}
@Override
public String getTransactionId() {
return transactionId;
}
@Override
public Scn getStartScn() {
return startScn;
}
@Override
public Instant getChangeTime() {
return changeTime;
}
@Override
public String getUserName() {
return userName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AbstractTransaction that = (AbstractTransaction) o;
return Objects.equals(transactionId, that.transactionId);
}
@Override
public int hashCode() {
return Objects.hash(transactionId);
}
@Override
public String toString() {
return "AbstractTransaction{" +
"transactionId='" + transactionId + '\'' +
", startScn=" + startScn +
", changeTime=" + changeTime +
", userName='" + userName + '\'' +
'}';
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.time.Instant;
import io.debezium.connector.oracle.Scn;
/**
* Contract for an Oracle transaction.
*
* @author Chris Cranford
*/
public interface Transaction {
String getTransactionId();
Scn getStartScn();
Instant getChangeTime();
String getUserName();
int getNumberOfEvents();
int getNextEventId();
void started();
}

View File

@ -8,19 +8,18 @@
import java.util.Iterator;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.Transaction;
/**
* Generalized contract that all transaction cache implementations should implement.
*
* @author Chris Cranford
*/
public interface TransactionCache<I> extends AutoCloseable {
Transaction get(String transactionId);
public interface TransactionCache<T extends AbstractTransaction, I> extends AutoCloseable {
T get(String transactionId);
void put(String transactionId, Transaction transaction);
void put(String transactionId, T transaction);
Transaction remove(String transactionId);
T remove(String transactionId);
int size();

View File

@ -0,0 +1,301 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.BlobChunkList;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.Table;
/**
* A consumer of transaction events at commit time that is capable of inspecting the event stream,
* merging events that should be merged when LOB support is enabled, and then delegating the final
* stream of events to a delegate consumer.
*
* @author Chris Cranford
*/
public class TransactionCommitConsumer implements AutoCloseable, BlockingConsumer<LogMinerEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCommitConsumer.class);
private final BlockingConsumer<LogMinerEvent> delegate;
private final OracleConnectorConfig connectorConfig;
private final OracleDatabaseSchema schema;
private final List<String> lobWriteData;
enum LobState {
WRITE,
ERASE,
OTHER
};
private LogMinerEvent lastEvent;
private SelectLobLocatorEvent lastSelectLobLocatorEvent;
private LobState lobState;
public TransactionCommitConsumer(BlockingConsumer<LogMinerEvent> delegate, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
this.delegate = delegate;
this.lobState = LobState.OTHER;
this.lobWriteData = new ArrayList<>();
this.connectorConfig = connectorConfig;
this.schema = schema;
}
@Override
public void close() throws InterruptedException {
if (lastEvent != null) {
if (!lobWriteData.isEmpty()) {
mergeLobWriteData(lastEvent);
}
dispatchChangeEvent(lastEvent);
}
}
@Override
public void accept(LogMinerEvent event) throws InterruptedException {
if (!connectorConfig.isLobEnabled()) {
// LOB support is not enabled, perform immediate dispatch
dispatchChangeEvent(event);
return;
}
if (lastEvent == null) {
// Always cache first event, follow-up events will dictate merge/dispatch status
this.lastEvent = event;
if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) {
this.lastSelectLobLocatorEvent = (SelectLobLocatorEvent) event;
}
}
else {
// Check whether the LOB data queue needs to be drained to the last event
LobState currentLobState = resolveLobStateByCurrentEvent(event);
if (currentLobState != this.lobState) {
if (this.lobState == LobState.WRITE) {
mergeLobWriteData(lastEvent);
}
this.lobState = currentLobState;
}
if (!isMerged(event, lastEvent)) {
LOGGER.trace("\tMerged skipped.");
// Events were not merged, dispatch last one and cache new
dispatchChangeEvent(lastEvent);
this.lastEvent = event;
}
else {
LOGGER.trace("\tMerged successfully.");
}
}
}
private void dispatchChangeEvent(LogMinerEvent event) throws InterruptedException {
LOGGER.trace("\tEmitting event {} {}", event.getEventType(), event);
delegate.accept(event);
}
private LobState resolveLobStateByCurrentEvent(LogMinerEvent event) {
switch (event.getEventType()) {
case LOB_WRITE:
return LobState.WRITE;
case LOB_ERASE:
return LobState.ERASE;
default:
return LobState.OTHER;
}
}
private boolean isMerged(LogMinerEvent event, LogMinerEvent prevEvent) {
LOGGER.trace("\tVerifying merge eligibility for event {} with {}", event.getEventType(), prevEvent.getEventType());
if (EventType.SELECT_LOB_LOCATOR == event.getEventType()) {
SelectLobLocatorEvent locatorEvent = (SelectLobLocatorEvent) event;
this.lastSelectLobLocatorEvent = locatorEvent;
if (EventType.INSERT == prevEvent.getEventType()) {
// Previous event is an INSERT
// Only merge the SEL_LOB_LOCATOR event if the previous INSERT is for the same table/row
// and if the INSERT's column value is either EMPTY_CLOB() or EMPTY_BLOB()
if (isForSameTableOrScn(event, prevEvent)) {
LOGGER.trace("\tMerging SEL_LOB_LOCATOR with previous INSERT event");
return true;
}
}
else if (EventType.UPDATE == prevEvent.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tUpdating SEL_LOB_LOCATOR column '{}' to previous UPDATE event", locatorEvent.getColumnName());
return true;
}
}
else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tAdding column '{}' to previous SEL_LOB_LOCATOR event", locatorEvent.getColumnName());
return true;
}
}
}
else if (EventType.LOB_WRITE == event.getEventType()) {
final LobWriteEvent writeEvent = (LobWriteEvent) event;
if (lastSelectLobLocatorEvent.isBinary()) {
if (!writeEvent.getData().startsWith("HEXTORAW('") && !writeEvent.getData().endsWith("')")) {
throw new DebeziumException("Unexpected LOB data chunk: " + writeEvent.getData());
}
}
LOGGER.trace("\tAdded LOB_WRITE data to internal LOB queue.");
lobWriteData.add(writeEvent.getData());
return true;
}
else if (EventType.LOB_ERASE == event.getEventType()) {
// nothing is done with the event, its just consumed and treated as merged.
LOGGER.warn("\tLOB_ERASE for table '{}' column '{}' is not supported.",
lastSelectLobLocatorEvent.getTableId(), lastSelectLobLocatorEvent.getColumnName());
LOGGER.trace("\tSkipped LOB_ERASE, treated as merged.");
return true;
}
else if (EventType.LOB_TRIM == event.getEventType()) {
// nothing is done with the event, its just consumed and treated as merged.
LOGGER.trace("\tSkipped LOB_TRIM, treated as merged.");
return true;
}
else if (EventType.INSERT == event.getEventType() || EventType.UPDATE == event.getEventType()) {
// Previous event is an INSERT
// The only valid combination here would be if the current event is an UPDATE since an INSERT
// cannot be merged with a prior INSERT.
if (EventType.INSERT == prevEvent.getEventType()) {
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tMerging UPDATE event with previous INSERT event");
mergeNewColumns((DmlEvent) event, (DmlEvent) prevEvent);
return true;
}
}
}
else if (EventType.UPDATE == prevEvent.getEventType()) {
// Previous event is an UPDATE
// This will happen if there are non LOB and inline LOB fields updated in the same SQL.
// The inline LOB values should be merged with the previous UPDATE event
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tMerging UPDATE event with previous UPDATE event");
mergeNewColumns((DmlEvent) event, (DmlEvent) prevEvent);
return true;
}
}
}
else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) {
// Previous event is a SEL_LOB_LOCATOR
// SQL contains both non-inline LOB and inline-LOB field changes
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
for (int i = 0; i < ((DmlEvent) event).getDmlEntry().getNewValues().length; ++i) {
Object value = ((DmlEvent) event).getDmlEntry().getNewValues()[i];
Object prevValue = ((DmlEvent) prevEvent).getDmlEntry().getNewValues()[i];
if (prevValue == null && value != null) {
LOGGER.trace("\tAdding column index {} to previous SEL_LOB_LOCATOR event", i);
((DmlEvent) prevEvent).getDmlEntry().getNewValues()[i] = value;
}
}
return true;
}
}
}
}
LOGGER.trace("\tEvent {} is for a different row, merge skipped.", event.getEventType());
return false;
}
private void mergeLobWriteData(LogMinerEvent event) {
final Object data;
if (this.lastSelectLobLocatorEvent.isBinary()) {
// For BLOB we pass the list of chunks as-is to the value converter
data = new BlobChunkList(lobWriteData);
}
else {
// For CLOB we go ahead and pre-process the list into a single string
data = String.join("", lobWriteData);
}
final DmlEvent dmlEvent = (DmlEvent) event;
final String columnName = lastSelectLobLocatorEvent.getColumnName();
final int columnIndex = getSelectLobLocatorColumnIndex();
LOGGER.trace("\tSet LOB data for column '{}' on table {} in event {}", columnName, event.getTableId(), event.getEventType());
dmlEvent.getDmlEntry().getNewValues()[columnIndex] = data;
lobWriteData.clear();
}
private int getSelectLobLocatorColumnIndex() {
final String columnName = lastSelectLobLocatorEvent.getColumnName();
return LogMinerHelper.getColumnIndexByName(columnName, schema.tableFor(lastSelectLobLocatorEvent.getTableId()));
}
private boolean isForSameTableOrScn(LogMinerEvent event, LogMinerEvent prevEvent) {
if (prevEvent != null) {
if (event.getTableId().equals(prevEvent.getTableId())) {
return true;
}
return event.getScn().equals(prevEvent.getScn()) && event.getRsId().equals(prevEvent.getRsId());
}
return false;
}
private boolean isSameTableRow(LogMinerEvent event, LogMinerEvent prevEvent) {
final Table table = schema.tableFor(event.getTableId());
if (table == null) {
LOGGER.trace("Unable to locate table '{}' schema, unable to detect if same row.", event.getTableId());
return false;
}
for (String columnName : table.primaryKeyColumnNames()) {
int position = LogMinerHelper.getColumnIndexByName(columnName, table);
Object prevValue = ((DmlEvent) prevEvent).getDmlEntry().getNewValues()[position];
if (prevValue == null) {
throw new DebeziumException("Could not find column " + columnName + " in previous event");
}
Object value = ((DmlEvent) event).getDmlEntry().getNewValues()[position];
if (value == null) {
throw new DebeziumException("Could not find column " + columnName + " in event");
}
if (!Objects.equals(value, prevValue)) {
return false;
}
}
return true;
}
private void mergeNewColumns(DmlEvent event, DmlEvent prevEvent) {
final boolean prevEventIsInsert = EventType.INSERT == prevEvent.getEventType();
for (int i = 0; i < event.getDmlEntry().getNewValues().length; ++i) {
Object value = event.getDmlEntry().getNewValues()[i];
Object prevValue = prevEvent.getDmlEntry().getNewValues()[i];
if (prevEventIsInsert && "EMPTY_CLOB()".equals(prevValue)) {
LOGGER.trace("\tAssigning column index {} with updated CLOB value.", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
}
else if (prevEventIsInsert && "EMPTY_BLOB()".equals(prevValue)) {
LOGGER.trace("\tAssigning column index {} with updated BLOB value.", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
}
else if (!prevEventIsInsert && value != null) {
LOGGER.trace("\tUpdating column index {} in previous event", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
}
}
}
}

View File

@ -1,419 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.BlobChunkList;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LobEraseEvent;
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.relational.Table;
/**
* Helper class that performs common transaction reconciliation.
*
* Transactions read from Oracle LogMiner are subject to containing events that need to be merged
* together to reflect a single logical SQL operation, such as events that pertain to LOB fields.
* This class facilities all the steps needed to merge events and reconcile a transaction.
*
* @author Chris Cranford
*/
public class TransactionReconciliation {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionReconciliation.class);
private final OracleConnectorConfig connectorConfig;
private final OracleDatabaseSchema schema;
public TransactionReconciliation(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) {
this.connectorConfig = connectorConfig;
this.schema = schema;
}
/**
* Reconcile the specified transaction by merging multiple events that should be emitted as a single
* logical event, such as changes made to LOB column types that involve multiple events.
*
* @param transaction transaction to be reconciled, never {@code null}
*/
public void reconcile(Transaction transaction) {
// Do not perform reconciliation if LOB support is not enabled.
if (!connectorConfig.isLobEnabled()) {
return;
}
final String transactionId = transaction.getTransactionId();
LOGGER.trace("Reconciling transaction {}", transactionId);
DmlEvent prevEvent = null;
int prevEventSize = transaction.getEvents().size();
for (int i = 0; i < transaction.getEvents().size();) {
final LogMinerEvent event = transaction.getEvents().get(i);
LOGGER.trace("Processing event {}", event);
switch (event.getEventType()) {
case SELECT_LOB_LOCATOR:
if (shouldMergeSelectLobLocatorEvent(transaction, i, (SelectLobLocatorEvent) event, prevEvent)) {
continue;
}
break;
case INSERT:
case UPDATE:
if (shouldMergeDmlEvent(transaction, i, (DmlEvent) event, prevEvent)) {
continue;
}
break;
}
++i;
prevEvent = (DmlEvent) event;
LOGGER.trace("Previous event is now {}", prevEvent);
}
int eventSize = transaction.getEvents().size();
if (eventSize != prevEventSize) {
LOGGER.trace("Reconciled transaction {} from {} events to {}.", transactionId, prevEventSize, eventSize);
}
else {
LOGGER.trace("Transaction {} event queue was unmodified.", transactionId);
}
}
/**
* Attempts to merge the provided SEL_LOB_LOCATOR event with the previous event in the transaction.
*
* @param transaction transaction being processed, never {@code null}
* @param index event index being processed
* @param event event being processed, never {@code null}
* @param prevEvent previous event in the transaction, can be {@code null}
* @return true if the event is merged, false if the event was not merged.
*/
protected boolean shouldMergeSelectLobLocatorEvent(Transaction transaction, int index, SelectLobLocatorEvent event, DmlEvent prevEvent) {
LOGGER.trace("\tDetected SelectLobLocatorEvent for column '{}'", event.getColumnName());
final int columnIndex = LogMinerHelper.getColumnIndexByName(event.getColumnName(), schema.tableFor(event.getTableId()));
// Read and combine all LOB_WRITE events that follow SEL_LOB_LOCATOR
Object lobData = null;
final List<String> lobWrites = readAndCombineLobWriteEvents(transaction, index, event.isBinary());
if (!lobWrites.isEmpty()) {
if (event.isBinary()) {
// For BLOB we pass the list of string chunks as-is to the value converter
lobData = new BlobChunkList(lobWrites);
}
else {
// For CLOB we go ahead and pre-process the List into a single string.
lobData = String.join("", lobWrites);
}
}
// Read and consume all LOB_ERASE events that follow SEL_LOB_LOCATOR
final int lobEraseEvents = readAndConsumeLobEraseEvents(transaction, index);
if (lobEraseEvents > 0) {
LOGGER.warn("LOB_ERASE for table '{}' column '{}' is not supported, use DML operations to manipulate LOB columns only.", event.getTableId(),
event.getColumnName());
if (lobWrites.isEmpty()) {
// There are no write and only erase events, discard entire SEL_LOB_LOCATOR
// To simulate this, we treat this as a "merge" op so caller doesn't modify previous event
transaction.getEvents().remove(index);
return true;
}
}
else if (lobEraseEvents == 0 && lobWrites.isEmpty()) {
// There were no LOB operations present, discard entire SEL_LOB_LOCATOR
// To simulate this, we treat this as a "merge" op so caller doesn't modify previous event
transaction.getEvents().remove(index);
return true;
}
// SelectLobLocatorEvent can be treated as a parent DML operation where an update occurs on any
// LOB-based column. In this case, the event will be treated as an UPDATE event when emitted.
if (prevEvent == null) {
// There is no prior event, add column to this SelectLobLocatorEvent and don't merge.
LOGGER.trace("\tAdding column '{}' to current event", event.getColumnName());
event.getDmlEntry().getNewValues()[columnIndex] = lobData;
return false;
}
if (EventType.INSERT == prevEvent.getEventType()) {
// Previous event is an INSERT operation.
// Only merge the SEL_LOB_LOCATOR event if the previous INSERT is for the same table/row
// and if the INSERT's column value is EMPTY_CLOB() or EMPTY_BLOB()
if (isForSameTableOrScn(event, prevEvent)) {
LOGGER.trace("\tMerging SEL_LOB_LOCATOR with previous INSERT event");
Object prevValue = prevEvent.getDmlEntry().getNewValues()[columnIndex];
if (!"EMPTY_CLOB()".equals(prevValue) && !"EMPTY_BLOB()".equals(prevValue)) {
throw new DebeziumException("Expected to find column '" + event.getColumnName() + "' in table '"
+ prevEvent.getTableId() + "' to be initialized as an empty LOB value.'");
}
prevEvent.getDmlEntry().getNewValues()[columnIndex] = lobData;
// Remove the SEL_LOB_LOCATOR event from event list and indicate merged.
transaction.getEvents().remove(index);
return true;
}
}
else if (EventType.UPDATE == prevEvent.getEventType()) {
// Previous event is an UPDATE operation.
// Only merge the SEL_LOB_LOCATOR event if the previous UPDATE is for the same table/row
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tUpdating SEL_LOB_LOCATOR column '{}' to previous UPDATE event", event.getColumnName());
prevEvent.getDmlEntry().getNewValues()[columnIndex] = lobData;
// Remove the SEL_LOB_LOCATOR event from event list and indicate merged.
transaction.getEvents().remove(index);
return true;
}
}
else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) {
// Previous event is a SEL_LOB_LOCATOR operation.
// Only merge the two SEL_LOB_LOCATOR events if they're for the same table/row
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tAdding column '{}' to previous SEL_LOB_LOCATOR event", event.getColumnName());
prevEvent.getDmlEntry().getNewValues()[columnIndex] = lobData;
// Remove the SEL_LOB_LOCATOR event from event list and indicate merged.
transaction.getEvents().remove(index);
return true;
}
}
else {
throw new DebeziumException("Unexpected previous event operation: " + prevEvent.getEventType());
}
LOGGER.trace("\tSEL_LOB_LOCATOR event is for different row, merge skipped.");
LOGGER.trace("\tAdding column '{}' to current event", event.getColumnName());
event.getDmlEntry().getNewValues()[columnIndex] = lobData;
return false;
}
/**
* Attempts to merge the provided DML event with the previous event in the transaction.
*
* @param transaction transaction being processed, never {@code null}
* @param index event index being processed
* @param event event being processed, never {@code null}
* @param prevEvent previous event in the transaction, can be {@code null}
* @return true if the event is merged, false if the event was not merged
*/
protected boolean shouldMergeDmlEvent(Transaction transaction, int index, DmlEvent event, DmlEvent prevEvent) {
LOGGER.trace("\tDetected DmlEvent {}", event.getEventType());
if (prevEvent == null) {
// There is no prior event, therefore there is no reason to perform any merge.
return false;
}
if (EventType.INSERT == prevEvent.getEventType()) {
// Previous event is an INSERT operation.
// The only valid combination here would be if the current event is an UPDATE since an INSERT cannot
// be merged with a prior INSERT with how LogMiner materializes the rows.
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tMerging UPDATE event with previous INSERT event");
mergeNewColumns(event, prevEvent);
// Remove the UPDATE event from event list and indicate merged.
transaction.getEvents().remove(index);
return true;
}
}
}
else if (EventType.UPDATE == prevEvent.getEventType()) {
// Previous event is an UPDATE operation.
// This will happen if there are non-CLOB and inline-CLOB fields updated in the same SQL.
// The inline-CLOB values should be merged with the previous UPDATE event.
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tMerging UPDATE event with previous UPDATE event");
mergeNewColumns(event, prevEvent);
// Remove the UPDATE event from event list and indicate merged.
transaction.getEvents().remove(index);
return true;
}
}
}
else if (EventType.SELECT_LOB_LOCATOR == prevEvent.getEventType()) {
// Previous event is a SEL_LOB_LOCATOR operation.
// SQL contained both non-inline CLOB and inline-CLOB field changes.
if (EventType.UPDATE == event.getEventType()) {
if (isForSameTableOrScn(event, prevEvent) && isSameTableRow(event, prevEvent)) {
LOGGER.trace("\tMerging UPDATE event with previous SEL_LOB_LOCATOR event");
for (int i = 0; i < event.getDmlEntry().getNewValues().length; ++i) {
Object value = event.getDmlEntry().getNewValues()[i];
Object prevValue = prevEvent.getDmlEntry().getNewValues()[i];
if (prevValue == null && value != null) {
LOGGER.trace("\tAdding column index {} to previous SEL_LOB_LOCATOR event", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
}
}
// Remove the UPDATE event from event list and indicate merged.
transaction.getEvents().remove(index);
return true;
}
}
}
LOGGER.trace("\tDmlEvent {} event is for different row, merge skipped.", event.getEventType());
return false;
}
/**
* Reads the transaction event queue and combines all LOB_WRITE events starting at the provided index.
* for a SEL_LOB_LOCATOR event which is for binary data (BLOB) data types.
*
* @param transaction transaction being processed, never {@code null}
* @param index index to the first LOB_WRITE operation
* @return list of string-based values for each LOB_WRITE operation
*/
protected List<String> readAndCombineLobWriteEvents(Transaction transaction, int index, boolean binaryData) {
List<String> chunks = new ArrayList<>();
for (int i = index + 1; i < transaction.getEvents().size(); ++i) {
final LogMinerEvent event = transaction.getEvents().get(i);
if (!(event instanceof LobWriteEvent)) {
break;
}
final LobWriteEvent writeEvent = (LobWriteEvent) event;
if (binaryData && !writeEvent.getData().startsWith("HEXTORAW('") && !writeEvent.getData().endsWith("')")) {
throw new DebeziumException("Unexpected BLOB data chunk: " + writeEvent.getData());
}
chunks.add(writeEvent.getData());
}
if (!chunks.isEmpty()) {
LOGGER.trace("\tCombined {} LobWriteEvent events", chunks.size());
// Remove events from the transaction queue queue
for (int i = 0; i < chunks.size(); ++i) {
transaction.getEvents().remove(index + 1);
}
}
return chunks;
}
/**
* Read and remove all LobErase events detected in the transaction event queue.
*
* @param transaction transaction being processed, never {@code null}
* @param index index to the first LOB_ERASE operation
* @return number of LOB_ERASE events consumed and removed from the event queue
*/
protected int readAndConsumeLobEraseEvents(Transaction transaction, int index) {
int events = 0;
for (int i = index + 1; i < transaction.getEvents().size(); ++i) {
final LogMinerEvent event = transaction.getEvents().get(i);
if (!(event instanceof LobEraseEvent)) {
break;
}
events++;
}
if (events > 0) {
LOGGER.trace("\tConsumed {} LobErase events", events);
for (int i = 0; i < events; ++i) {
transaction.getEvents().remove(index + 1);
}
}
return events;
}
/**
* Checks whether the two events are for the same table or participate in the same system change.
*
* @param event current event being processed, never {@code null}
* @param prevEvent previous/parent event that has been processed, may be {@code null}
* @return true if the two events are for the same table or system change number, false otherwise
*/
protected boolean isForSameTableOrScn(LogMinerEvent event, LogMinerEvent prevEvent) {
if (prevEvent != null) {
if (event.getTableId().equals(prevEvent.getTableId())) {
return true;
}
return event.getScn().equals(prevEvent.getScn()) && event.getRsId().equals(prevEvent.getRsId());
}
return false;
}
/**
* Checks whether the two events are for the same table row.
*
* @param event current event being processed, never {@code null}
* @param prevEvent previous/parent event that has been processed, never {@code null}
* @return true if the two events are for the same table row, false otherwise
*/
protected boolean isSameTableRow(DmlEvent event, DmlEvent prevEvent) {
final Table table = schema.tableFor(event.getTableId());
if (table == null) {
LOGGER.trace("Unable to locate table '{}' schema, unable to detect if same row.", event.getTableId());
return false;
}
for (String columnName : table.primaryKeyColumnNames()) {
int position = LogMinerHelper.getColumnIndexByName(columnName, table);
Object prevValue = prevEvent.getDmlEntry().getNewValues()[position];
if (prevValue == null) {
throw new DebeziumException("Could not find column " + columnName + " in previous event");
}
Object value = event.getDmlEntry().getNewValues()[position];
if (value == null) {
throw new DebeziumException("Could not find column " + columnName + " in event");
}
if (!Objects.equals(value, prevValue)) {
return false;
}
}
return true;
}
/**
* Merge column values from {@code event} with {@code prevEvent}.
*
* @param event current event being processed, never {@code null}
* @param prevEvent previous/parent parent that has been processed, never {@code null}
*/
protected void mergeNewColumns(DmlEvent event, DmlEvent prevEvent) {
final boolean prevEventIsInsert = EventType.INSERT == prevEvent.getEventType();
for (int i = 0; i < event.getDmlEntry().getNewValues().length; ++i) {
Object value = event.getDmlEntry().getNewValues()[i];
Object prevValue = prevEvent.getDmlEntry().getNewValues()[i];
if (prevEventIsInsert && "EMPTY_CLOB()".equals(prevValue)) {
LOGGER.trace("\tAssigning column index {} with updated CLOB value.", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
}
else if (prevEventIsInsert && "EMPTY_BLOB()".equals(prevValue)) {
LOGGER.trace("\tAssigning column index {} with updated BLOB value.", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
}
else if (!prevEventIsInsert && value != null) {
LOGGER.trace("\tUpdating column index {} in previous event", i);
prevEvent.getDmlEntry().getNewValues()[i] = value;
}
}
}
}

View File

@ -11,6 +11,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
@ -32,9 +33,11 @@
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.Transaction;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
@ -46,7 +49,7 @@
*
* @author Chris Cranford
*/
public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor {
public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanLogMinerEventProcessor.class);
@ -62,6 +65,13 @@ public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProce
*/
private final InfinispanTransactionCache transactionCache;
/**
* A cache storing each of the raw events read from the LogMiner contents view.
* This cache is keyed using the format of "<transaction-id>-<sequence>" where the sequence
* is obtained from the {@link InfinispanTransaction#getEventId(int)} method.
*/
private final Cache<String, LogMinerEvent> eventCache;
/**
* A cache storing recently committed transactions key by the unique transaction id and
* the event's system change number. This cache is used to filter events during re-mining
@ -113,9 +123,26 @@ public InfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
final EmbeddedCacheManager manager = new DefaultCacheManager();
this.transactionCache = new InfinispanTransactionCache(createCache(manager, connectorConfig, "transactions"));
this.eventCache = createCache(manager, connectorConfig, "events");
this.recentlyCommittedTransactionsCache = createCache(manager, connectorConfig, "committed-transactions");
this.rollbackTransactionsCache = createCache(manager, connectorConfig, "rollback-transactions");
this.schemaChangesCache = createCache(manager, connectorConfig, "schema-changes");
displayCacheStatistics();
}
private void displayCacheStatistics() {
LOGGER.info("Cache Statistics:");
LOGGER.info("\tTransactions : {}", transactionCache.size());
LOGGER.info("\tCommitted Trxs : {}", recentlyCommittedTransactionsCache.size());
LOGGER.info("\tRollback Trxs : {}", rollbackTransactionsCache.size());
LOGGER.info("\tSchema Changes : {}", schemaChangesCache.size());
LOGGER.info("\tEvents : {}", eventCache.size());
if (!eventCache.isEmpty()) {
for (String eventKey : eventCache.keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
}
}
}
private <K, V> Cache<K, V> createCache(EmbeddedCacheManager manager, OracleConnectorConfig connectorConfig, String name) {
@ -137,14 +164,31 @@ private <K, V> Cache<K, V> createCache(EmbeddedCacheManager manager, OracleConne
}
@Override
protected TransactionCache<?> getTransactionCache() {
protected TransactionCache<InfinispanTransaction, ?> getTransactionCache() {
return transactionCache;
}
@Override
protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
return new InfinispanTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName());
}
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
for (String eventKey : eventCache.keySet().stream().filter(k -> k.startsWith(row.getTransactionId() + "-")).collect(Collectors.toList())) {
final LogMinerEvent event = eventCache.get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.trace("Undo applied for event {}.", event);
eventCache.remove(eventKey);
}
}
}
@Override
public void close() throws Exception {
if (getConfig().isLogMiningBufferDropOnStop()) {
// Since the buffers are to be dropped on stop, clear them before stopping provider.
eventCache.clear();
transactionCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
@ -154,6 +198,7 @@ public void close() throws Exception {
recentlyCommittedTransactionsCache.stop();
rollbackTransactionsCache.stop();
schemaChangesCache.stop();
eventCache.stop();
transactionCache.close();
}
@ -240,20 +285,24 @@ protected void handleCommit(LogMinerEventRow row) throws InterruptedException {
return;
}
final Transaction transaction = transactionCache.remove(transactionId);
final InfinispanTransaction transaction = transactionCache.remove(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not found.", transactionId);
return;
}
boolean skipExcludedUserName = false;
if (transaction.getUserName() == null && !transaction.getEvents().isEmpty()) {
final boolean skipExcludedUserName;
if (transaction.getUserName() == null && transaction.getNumberOfEvents() > 0) {
LOGGER.debug("Got transaction with null username {}", transaction);
skipExcludedUserName = false;
}
else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUserName())) {
LOGGER.trace("Skipping transaction with excluded username {}", transaction);
skipExcludedUserName = true;
}
else {
skipExcludedUserName = false;
}
final Scn smallestScn = transactionCache.getMinimumScn();
metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
@ -263,24 +312,25 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
if ((offsetCommitScn != null && offsetCommitScn.compareTo(commitScn) > 0) || lastCommittedScn.compareTo(commitScn) > 0) {
LOGGER.debug("Transaction {} has already been processed. Commit SCN in offset is {} while commit SCN of transaction is {} and last seen committed SCN is {}.",
transactionId, offsetCommitScn, commitScn, lastCommittedScn);
transactionCache.remove(transactionId);
metrics.setActiveTransactions(transactionCache.size());
removeEventsWithTransaction(transaction);
return;
}
counters.commitCount++;
Instant start = Instant.now();
getReconciliation().reconcile(transaction);
int numEvents = transaction.getEvents().size();
int numEvents = getTransactionEventCount(transaction);
LOGGER.trace("Commit: (smallest SCN {}) {}", smallestScn, row);
LOGGER.trace("Transaction {} has {} events", transactionId, numEvents);
for (LogMinerEvent event : transaction.getEvents()) {
if (!context.isRunning()) {
return;
}
BlockingConsumer<LogMinerEvent> delegate = new BlockingConsumer<LogMinerEvent>() {
private int numEvents = getTransactionEventCount(transaction);
@Override
public void accept(LogMinerEvent event) throws InterruptedException {
// Update SCN in offset context only if processed SCN less than SCN of other transactions
if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
offsetContext.setScn(event.getScn());
@ -295,8 +345,6 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
offsetContext.setCommitScn(commitScn);
}
// after reconciliation all events should be DML
// todo: do we want to move dml entry up and just let it be null to avoid cast?
final DmlEvent dmlEvent = (DmlEvent) event;
if (!skipExcludedUserName) {
dispatcher.dispatchDataChangeEvent(event.getTableId(),
@ -310,9 +358,31 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
Clock.system()));
}
}
};
int eventCount = 0;
try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, getConfig(), getSchema())) {
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
if (!context.isRunning()) {
return;
}
final LogMinerEvent event = eventCache.get(transaction.getEventId(i));
if (event == null) {
// If an event is undone, it gets removed from the cache at undo time.
// This means that the call to get could return a null event and we
// should silently ignore it.
continue;
}
eventCount++;
LOGGER.trace("Dispatching event {} {}", transaction.getEventId(i), event.getEventType());
commitConsumer.accept(event);
}
}
lastCommittedScn = Scn.valueOf(commitScn.longValue());
if (!transaction.getEvents().isEmpty() && !skipExcludedUserName) {
if (transaction.getNumberOfEvents() > 0 && !skipExcludedUserName) {
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
}
else {
@ -327,9 +397,12 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
// cache recently committed transactions by transaction id
recentlyCommittedTransactionsCache.put(transactionId, commitScn.toString());
// Clear the event queue for the transaction
removeEventsWithTransaction(transaction);
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(transactionCache.size());
metrics.incrementCommittedDmlCount(transaction.getEvents().size());
metrics.incrementCommittedDmlCount(eventCount);
metrics.setCommittedScn(commitScn);
metrics.setOffsetScn(offsetContext.getScn());
metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
@ -337,8 +410,9 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
@Override
protected void handleRollback(LogMinerEventRow row) {
final Transaction transaction = transactionCache.get(row.getTransactionId());
final InfinispanTransaction transaction = transactionCache.get(row.getTransactionId());
if (transaction != null) {
removeEventsWithTransaction(transaction);
transactionCache.remove(row.getTransactionId());
rollbackTransactionsCache.put(row.getTransactionId(), row.getScn().toString());
@ -361,16 +435,16 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (isTransactionIdAllowed(transactionId)) {
Transaction transaction = getTransactionCache().get(transactionId);
InfinispanTransaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} is not in cache, creating.", transactionId);
transaction = new Transaction(transactionId, row.getScn(), row.getChangeTime(), row.getUserName());
transaction = createTransaction(row);
}
int eventId = transaction.getNextEventId();
if (transaction.getEvents().size() <= eventId) {
String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!eventCache.containsKey(eventKey)) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId);
transaction.getEvents().add(eventSupplier.get());
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
eventCache.put(eventKey, eventSupplier.get());
metrics.calculateLagMetrics(row.getChangeTime());
}
// When using Infinispan, this extra put is required so that the state is properly synchronized
@ -379,6 +453,15 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
}
}
@Override
protected int getTransactionEventCount(InfinispanTransaction transaction) {
// todo: implement indexed keys when ISPN supports them
return (int) eventCache.keySet()
.parallelStream()
.filter(k -> k.startsWith(transaction.getTransactionId() + "-"))
.count();
}
private PreparedStatement createQueryStatement() throws SQLException {
final String query = LogMinerQueryBuilder.build(getConfig(), getSchema());
return jdbcConnection.connection().prepareStatement(query,
@ -434,4 +517,11 @@ private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
return endScn;
}
}
private void removeEventsWithTransaction(InfinispanTransaction transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
eventCache.remove(transaction.getEventId(i));
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.time.Instant;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.AbstractTransaction;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.VisibleForMarshalling;
/**
* A concrete implementation of {@link AbstractTransaction} for the Infinispan processor.
*
* @author Chris Cranford
*/
public class InfinispanTransaction extends AbstractTransaction {
private int numberOfEvents;
public InfinispanTransaction(String transactionId, Scn startScn, Instant changeTime, String userName) {
super(transactionId, startScn, changeTime, userName);
started();
}
@VisibleForMarshalling
public InfinispanTransaction(String transactionId, Scn startScn, Instant changeTime, String userName, int numberOfEvents) {
this(transactionId, startScn, changeTime, userName);
this.numberOfEvents = numberOfEvents;
}
@Override
public int getNumberOfEvents() {
return numberOfEvents;
}
@Override
public int getNextEventId() {
return numberOfEvents++;
}
@Override
public void started() {
numberOfEvents = 0;
}
public String getEventId(int index) {
if (index < 0 || index >= numberOfEvents) {
throw new IndexOutOfBoundsException("Index " + index + "outside the transaction " + getTransactionId() + " event list bounds");
}
return getTransactionId() + "-" + index;
}
@Override
public String toString() {
return "InfinispanTransaction{" +
"numberOfEvents=" + numberOfEvents +
"} " + super.toString();
}
}

View File

@ -11,7 +11,6 @@
import org.infinispan.commons.util.CloseableIterator;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
/**
@ -19,26 +18,26 @@
*
* @author Chris Cranford
*/
public class InfinispanTransactionCache implements TransactionCache<Cache.Entry<String, Transaction>> {
public class InfinispanTransactionCache implements TransactionCache<InfinispanTransaction, Cache.Entry<String, InfinispanTransaction>> {
private final Cache<String, Transaction> cache;
private final Cache<String, InfinispanTransaction> cache;
public InfinispanTransactionCache(Cache<String, Transaction> cache) {
public InfinispanTransactionCache(Cache<String, InfinispanTransaction> cache) {
this.cache = cache;
}
@Override
public Transaction get(String transactionId) {
public InfinispanTransaction get(String transactionId) {
return cache.get(transactionId);
}
@Override
public void put(String transactionId, Transaction transaction) {
public void put(String transactionId, InfinispanTransaction transaction) {
cache.put(transactionId, transaction);
}
@Override
public Transaction remove(String transactionId) {
public InfinispanTransaction remove(String transactionId) {
return cache.remove(transactionId);
}
@ -58,14 +57,14 @@ public boolean isEmpty() {
}
@Override
public Iterator<Cache.Entry<String, Transaction>> iterator() {
public Iterator<Cache.Entry<String, InfinispanTransaction>> iterator() {
return cache.entrySet().iterator();
}
@Override
public Scn getMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<Transaction> iterator = cache.values().iterator()) {
try (CloseableIterator<InfinispanTransaction> iterator = cache.values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling;
import java.time.Instant;
import org.infinispan.protostream.annotations.ProtoAdapter;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl;
import io.debezium.relational.TableId;
/**
* An Infinispan ProtoStream adapter to marshall {@link DmlEvent} instances.
*
* This class defines a factory for creating {@link DmlEvent} instances when hydrating
* records from the persisted datastore as well as field handlers to extract values
* to be marshalled to the protocol buffer stream.
*
* The underlying protocol buffer record consists of the following structure:
* <pre>
* message DmlEvent {
* // structure of the super type, LogMinerEventAdapter
* required LogMinerDmlEntryImpl entry = 7;
* }
* </pre>
*
* @author Chris Cranford
*/
@ProtoAdapter(DmlEvent.class)
public class DmlEventAdapter extends LogMinerEventAdapter {
/**
* A ProtoStream factory that creates {@link DmlEvent} instances.
*
* @param eventType the event type
* @param scn the system change number, must not be {@code null}
* @param tableId the fully-qualified table name
* @param rowId the Oracle row-id the change is associated with
* @param rsId the Oracle rollback segment identifier
* @param changeTime the time the change occurred
* @param entry the parsed SQL statement entry
* @return the constructed DmlEvent
*/
@ProtoFactory
public DmlEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, LogMinerDmlEntryImpl entry) {
return new DmlEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), entry);
}
/**
* A ProtoStream handler to extract the {@code entry} field from the {@link DmlEvent}.
*
* @param event the event instance, must not be {@code null}
* @return the LogMinerDmlEntryImpl instance
*/
@ProtoField(number = 7, required = true)
public LogMinerDmlEntryImpl getEntry(DmlEvent event) {
return (LogMinerDmlEntryImpl) event.getDmlEntry();
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling;
import java.time.Instant;
import org.infinispan.protostream.annotations.ProtoAdapter;
import org.infinispan.protostream.annotations.ProtoFactory;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LobEraseEvent;
import io.debezium.relational.TableId;
/**
* An Infinispan ProtoStream adapter to marshall {@link LobEraseEvent} instances.
*
* This class defines a factory for creating {@link LobEraseEvent} instances when hydrating
* records from the persisted datastore asa well as field handlers to extract values
* to be marshalled to the protocol buffer stream.
*
* @see LogMinerEventAdapter for the structure format of this adapter.
* @author Chris Cranford
*/
@ProtoAdapter(LobEraseEvent.class)
public class LobEraseEventAdapter extends LogMinerEventAdapter {
/**
* A ProtoStream factory that creates {@link LobEraseEvent} instances.
*
* @param eventType the event type
* @param scn the system change number, must not be {@code null}
* @param tableId the fully-qualified table name
* @param rowId the Oracle row-id the change is associated with
* @param rsId the Oracle rollback segment identifier
* @param changeTime the time the change occurred
* @return the constructed LobEraseEvent
*/
@ProtoFactory
public LobEraseEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime) {
return new LobEraseEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime));
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling;
import java.time.Instant;
import org.infinispan.protostream.annotations.ProtoAdapter;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
import io.debezium.relational.TableId;
/**
* An Infinispan ProtoStream adapter to marshall {@link LobWriteEvent} instances.
*
* This class defines a factory for creating {@link LobWriteEvent} instances when hydrating
* records from the persisted datastore asa well as field handlers to extract values
* to be marshalled to the protocol buffer stream.
*
* The underlying protocol buffer record consists of the following structure:
* <pre>
* message LobWriteEvent {
* // structure of the super type, LogMinerEventAdapter
* string data = 7;
* }
* </pre>
*
* @author Chris Cranford
*/
@ProtoAdapter(LobWriteEvent.class)
public class LobWriteEventAdapter extends LogMinerEventAdapter {
/**
* A ProtoStream factory that creates {@link LobWriteEvent} instances.
*
* @param eventType the event type
* @param scn the system change number, must not be {@code null}
* @param tableId the fully-qualified table name
* @param rowId the Oracle row-id the change is associated with
* @param rsId the Oracle rollback segment identifier
* @param changeTime the time the change occurred
* @param data the LOB data
* @return the constructed DmlEvent
*/
@ProtoFactory
public LobWriteEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, String data) {
return new LobWriteEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), data);
}
/**
* A ProtoStream handler to extract the {@code data} field from a {@link LobWriteEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the data to be written for a LOB field, may be {@code null}
*/
@ProtoField(number = 7)
public String getData(LobWriteEvent event) {
return event.getData();
}
}

View File

@ -11,23 +11,16 @@
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LobEraseEvent;
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
/**
* An Infinispan ProtoStream adapter to marshall {@link LogMinerEvent} instances and its subclasses.
* An Infinispan ProtoStream adapter to marshall {@link LogMinerEvent} instances.
*
* This class defines a factory for creating {@link LogMinerEvent} instances and its subclasses for when
* hydrating records from the persisted datastore as well as field handlers to extract instance values
* This class defines a factory for creating {@link LogMinerEvent} instances when hydrating
* records from the persisted event datastore as well as field handlers to extract values
* to be marshalled to the protocol buffer stream.
*
* The underlying protocol buffer record consists of the following structure:
@ -39,19 +32,16 @@
* required string rowId = 4;
* required object rsId = 5;
* required string changeTime = 6;
* LogMinerDmlEntryImpl entry = 7;
* string columnName = 8;
* boolean binary = 9;
* string data = 10;
* }
* </pre>
*
* @author Chris Cranford
*/
@ProtoAdapter(LogMinerEvent.class)
public class LogMinerEventAdapter {
/**
* A ProtoStream factory that creates {@link LogMinerEvent} instances or one of its subclasses from field values.
* A ProtoStream factory that creates {@link LogMinerEvent} instances.
*
* @param eventType the event type
* @param scn the system change number, must not be {@code null}
@ -59,50 +49,11 @@ public class LogMinerEventAdapter {
* @param rowId the Oracle row-id the change is associated with
* @param rsId the Oracle rollback segment identifier
* @param changeTime the time the change occurred
* @param entry the parsed SQL statement entry
* @param columnName the column name for a {@code SEL_LOB_LOCATOR} event type
* @param binary whether the data is binary for a {@code SEL_LOB_LOCATOR} event type
* @param data the data to be written by a {@code LOB_WRITE} event type
* @return the constructed LogMinerEvent or one of its subclasses
* @return the constructed DmlEvent
*/
@ProtoFactory
public LogMinerEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime,
LogMinerDmlEntryImpl entry, String columnName, Boolean binary, String data) {
final EventType type = EventType.from(eventType);
final Scn eventScn = Scn.valueOf(scn);
final TableId id;
if (Strings.isNullOrEmpty(tableId)) {
id = null;
}
else {
id = TableId.parse(tableId);
}
final Instant time;
if (Strings.isNullOrEmpty(changeTime)) {
time = null;
}
else {
time = Instant.parse(changeTime);
}
switch (type) {
case INSERT:
case UPDATE:
case DELETE:
return new DmlEvent(type, eventScn, id, rowId, rsId, time, entry);
case SELECT_LOB_LOCATOR:
return new SelectLobLocatorEvent(type, eventScn, id, rowId, rsId, time, entry, columnName, binary);
case LOB_WRITE:
return new LobWriteEvent(type, eventScn, id, rowId, rsId, time, data);
case LOB_ERASE:
return new LobEraseEvent(type, eventScn, id, rowId, rsId, time);
case LOB_TRIM:
return new LogMinerEvent(type, eventScn, id, rowId, rsId, time);
default:
throw new DebeziumException("Unknown event type: " + eventType);
}
public LogMinerEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime) {
return new LogMinerEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime));
}
/**
@ -138,8 +89,6 @@ public String getScn(LogMinerEvent event) {
*/
@ProtoField(number = 3, required = true)
public String getTableId(LogMinerEvent event) {
// We intentionally serialize the TableId as a string since string values are natively supported
// by protocol buffers and we don't need to write a special marshaller for the TableId class.
return event.getTableId().identifier();
}
@ -173,69 +122,6 @@ public String getRsId(LogMinerEvent event) {
*/
@ProtoField(number = 6, required = true)
public String getChangeTime(LogMinerEvent event) {
// Serialized using Instant's normal toString() format since strings as handled natively by
// protocol buffers and will be parsed using Instant#parse in the factory method.
return event.getChangeTime().toString();
}
/**
* A ProtoStream handler to extract the {@code entry} field from the {@link DmlEvent} and subclass types.
*
* @param event the event instance, must not be {@code null}
* @return the LogMinerDmlEntryImpl instance or {@code null} if the event type isn't related to DML events
*/
@ProtoField(number = 7)
public LogMinerDmlEntryImpl getEntry(LogMinerEvent event) {
switch (event.getEventType()) {
case INSERT:
case UPDATE:
case DELETE:
case SELECT_LOB_LOCATOR:
return (LogMinerDmlEntryImpl) ((DmlEvent) event).getDmlEntry();
default:
return null;
}
}
/**
* A ProtoStream handler to extract the {@code columnName} field from a {@link SelectLobLocatorEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the column name or {@code null} if the event is not a SelectLobLocatorEvent type
*/
@ProtoField(number = 8)
public String getColumnName(LogMinerEvent event) {
if (EventType.SELECT_LOB_LOCATOR.equals(event.getEventType())) {
return ((SelectLobLocatorEvent) event).getColumnName();
}
return null;
}
/**
* A ProtoStream handler to extract the {@code binary} field from a {@link SelectLobLocatorEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the binary data flag or {@code null} if the event is not a SelectLobLocatorEvent type
*/
@ProtoField(number = 9)
public Boolean getBinary(LogMinerEvent event) {
if (EventType.SELECT_LOB_LOCATOR.equals(event.getEventType())) {
return ((SelectLobLocatorEvent) event).isBinary();
}
return null;
}
/**
* A ProtoStream handler to extract the {@code data} field from a {@link LobWriteEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the data to be written for a LOB field or {@code null} if the event is not a LobWriteEvent type
*/
@ProtoField(number = 10)
public String getData(LogMinerEvent event) {
if (EventType.LOB_WRITE.equals(event.getEventType())) {
return ((LobWriteEvent) event).getData();
}
return null;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;
/**
* An interface that is used by the ProtoStream framework to designate the adapters and path
* to where the a Protocol Buffers .proto file will be generated based on the adapters
* at compile time.
*
* @author Chris Cranford
*/
@AutoProtoSchemaBuilder(includeClasses = { LogMinerEventAdapter.class, DmlEventAdapter.class, SelectLobLocatorEventAdapter.class, LobWriteEventAdapter.class,
LobEraseEventAdapter.class, LogMinerDmlEntryImplAdapter.class }, schemaFilePath = "/")
public interface LogMinerEventMarshaller extends SerializationContextInitializer {
}

View File

@ -0,0 +1,83 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling;
import java.time.Instant;
import org.infinispan.protostream.annotations.ProtoAdapter;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl;
import io.debezium.relational.TableId;
/**
* An Infinispan ProtoStream adapter to marshall {@link SelectLobLocatorEvent} instances.
*
* This class defines a factory for creating {@link SelectLobLocatorEvent} instances when hydrating
* records from the persisted datastore asa well as field handlers to extract values
* to be marshalled to the protocol buffer stream.
*
* The underlying protocol buffer record consists of the following structure:
* <pre>
* message SelectLobLocatorEvent {
* // structure of the super type, DmlEventAdapter
* required string columnName = 8;
* required boolean binary = 9;
* }
* </pre>
*
* @author Chris Cranford
*/
@ProtoAdapter(SelectLobLocatorEvent.class)
public class SelectLobLocatorEventAdapter extends DmlEventAdapter {
/**
* A ProtoStream factory that creates {@link SelectLobLocatorEvent} instances.
*
* @param eventType the event type
* @param scn the system change number, must not be {@code null}
* @param tableId the fully-qualified table name
* @param rowId the Oracle row-id the change is associated with
* @param rsId the Oracle rollback segment identifier
* @param changeTime the time the change occurred
* @param entry the parsed SQL statement entry
* @param columnName the column name references by the SelectLobLocatorEvent
* @param binary whether the data is binary- or character- based
* @return the constructed SelectLobLocatorEvent
*/
@ProtoFactory
public SelectLobLocatorEvent factory(int eventType, String scn, String tableId, String rowId, String rsId, String changeTime, LogMinerDmlEntryImpl entry,
String columnName, Boolean binary) {
return new SelectLobLocatorEvent(EventType.from(eventType), Scn.valueOf(scn), TableId.parse(tableId), rowId, rsId, Instant.parse(changeTime), entry, columnName,
binary);
}
/**
* A ProtoStream handler to extract the {@code columnName} field from a {@link SelectLobLocatorEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the column name
*/
@ProtoField(number = 8, required = true)
public String getColumnName(SelectLobLocatorEvent event) {
return event.getColumnName();
}
/**
* A ProtoStream handler to extract the {@code binary} field from a {@link SelectLobLocatorEvent} type.
*
* @param event the event instance, must not be {@code null}
* @return the binary data flag
*/
@ProtoField(number = 9, required = true)
public Boolean getBinary(SelectLobLocatorEvent event) {
return event.isBinary();
}
}

View File

@ -6,63 +6,60 @@
package io.debezium.connector.oracle.logminer.processor.infinispan.marshalling;
import java.time.Instant;
import java.util.List;
import org.infinispan.protostream.annotations.ProtoAdapter;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanTransaction;
/**
* An Infinispan ProtoStream adapter for marshalling a {@link Transaction} instance.
* An Infinispan ProtoStream adapter for marshalling a {@link InfinispanTransaction} instance.
*
* This class defines a factory for creating {@link Transaction} instances when hydrating a transaction
* This class defines a factory for creating {@link InfinispanTransaction} instances when hydrating a transaction
* record from the protocol buffer datastore as well as field handlers to extract values from a given
* transaction instance for serializing the instance to a protocol buffer stream.
*
* @author Chris Cranford
*/
@ProtoAdapter(Transaction.class)
@ProtoAdapter(InfinispanTransaction.class)
public class TransactionAdapter {
/**
* A ProtoStream factory that creates a {@link Transaction} instance from field values.
* A ProtoStream factory that creates a {@link InfinispanTransaction} instance from field values.
*
* @param transactionId the transaction identifier
* @param scn the starting system change number of the transaction
* @param changeTime the starting time of the transaction
* @param events list of events that are part of the transaction
* @param userName the user name
* @param numberOfEvents the number of events in the transaction
* @return the constructed Transaction instance
*/
@ProtoFactory
public Transaction factory(String transactionId, String scn, String changeTime, List<LogMinerEvent> events, String userName, int numberOfEvents) {
return new Transaction(transactionId, Scn.valueOf(scn), Instant.parse(changeTime), events, userName, numberOfEvents);
public InfinispanTransaction factory(String transactionId, String scn, String changeTime, String userName, int numberOfEvents) {
return new InfinispanTransaction(transactionId, Scn.valueOf(scn), Instant.parse(changeTime), userName, numberOfEvents);
}
/**
* A ProtoStream handler to extract the {@code transactionId} field from the {@link Transaction}.
* A ProtoStream handler to extract the {@code transactionId} field from the {@link InfinispanTransaction}.
*
* @param transaction the transaction instance, must not be {@code null}
* @return the transaction identifier, never {@code null}
*/
@ProtoField(number = 1, required = true)
public String getTransactionId(Transaction transaction) {
public String getTransactionId(InfinispanTransaction transaction) {
return transaction.getTransactionId();
}
/**
* A ProtoStream handler to extract the {@code startScn} field from the {@link Transaction}.
* A ProtoStream handler to extract the {@code startScn} field from the {@link InfinispanTransaction}.
*
* @param transaction the transaction instance, must not be {@code null}
* @return the starting system change number, never {@code null}
*/
@ProtoField(number = 2, required = true)
public String getScn(Transaction transaction) {
public String getScn(InfinispanTransaction transaction) {
// We intentionally serialize the Scn class as a string to the protocol buffer datastore
// and so the factory method also accepts a string parameter and converts the value to a
// Scn instance during instantiation. This avoids the need for an additional adapter.
@ -70,40 +67,29 @@ public String getScn(Transaction transaction) {
}
/**
* A ProtoStream handler to extract the {@code changeTime} field from the {@link Transaction}.
* A ProtoStream handler to extract the {@code changeTime} field from the {@link InfinispanTransaction}.
*
* @param transaction the transaction instance, must not be {@code null}
* @return the starting time of the transaction, never {@code null}
*/
@ProtoField(number = 3, required = true)
public String getChangeTime(Transaction transaction) {
public String getChangeTime(InfinispanTransaction transaction) {
return transaction.getChangeTime().toString();
}
/**
* A ProtoStream handler to extract the {@code events} field from the {@link Transaction}.
*
* @param transaction the transaction instance, must not be {@code null}
* @return list of events within the transaction
*/
@ProtoField(number = 4)
public List<LogMinerEvent> getEvents(Transaction transaction) {
return transaction.getEvents();
}
@ProtoField(number = 5)
public String getUserName(Transaction transaction) {
public String getUserName(InfinispanTransaction transaction) {
return transaction.getUserName();
}
/**
* A ProtoStream handler to extract the {@code eventIds} field from the {@link Transaction}.
* A ProtoStream handler to extract the {@code eventIds} field from the {@link InfinispanTransaction}.
*
* @param transaction the transaction instance, must not be {@code null}
* @return the number of events in the transaction
*/
@ProtoField(number = 6, defaultValue = "0")
public int getNumberOfEvents(Transaction transaction) {
@ProtoField(number = 5, defaultValue = "0")
public int getNumberOfEvents(InfinispanTransaction transaction) {
return transaction.getNumberOfEvents();
}
}

View File

@ -15,6 +15,6 @@
*
* @author Chris Cranford
*/
@AutoProtoSchemaBuilder(includeClasses = { TransactionAdapter.class, LogMinerEventAdapter.class, LogMinerDmlEntryImplAdapter.class }, schemaFilePath = "/")
@AutoProtoSchemaBuilder(includeClasses = { TransactionAdapter.class }, schemaFilePath = "/")
public interface TransactionMarshaller extends SerializationContextInitializer {
}

View File

@ -16,6 +16,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,10 +34,11 @@
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
@ -48,7 +50,7 @@
*
* @author Chris Cranford
*/
public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor {
public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor<MemoryTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLogMinerEventProcessor.class);
@ -88,10 +90,26 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
}
@Override
protected TransactionCache<?> getTransactionCache() {
protected TransactionCache<MemoryTransaction, ?> getTransactionCache() {
return transactionCache;
}
@Override
protected MemoryTransaction createTransaction(LogMinerEventRow row) {
return new MemoryTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName());
}
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
final MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId());
if (transaction == null) {
LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row);
}
else {
transaction.removeEventWithRowId(row.getRowId());
}
}
@Override
public void close() throws Exception {
// close any resources used here
@ -153,9 +171,9 @@ public void abandonTransactions(Duration retention) {
thresholdScn = smallestScn;
}
Iterator<Map.Entry<String, Transaction>> iterator = transactionCache.iterator();
Iterator<Map.Entry<String, MemoryTransaction>> iterator = transactionCache.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Transaction> entry = iterator.next();
Map.Entry<String, MemoryTransaction> entry = iterator.next();
if (entry.getValue().getStartScn().compareTo(thresholdScn) <= 0) {
LOGGER.warn("Transaction {} is being abandoned.", entry.getKey());
abandonedTransactionsCache.add(entry.getKey());
@ -210,20 +228,24 @@ protected void handleCommit(LogMinerEventRow row) throws InterruptedException {
return;
}
final Transaction transaction = transactionCache.remove(transactionId);
final MemoryTransaction transaction = transactionCache.remove(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not found.", transactionId);
return;
}
boolean skipExcludedUserName = false;
final boolean skipExcludedUserName;
if (transaction.getUserName() == null && !transaction.getEvents().isEmpty()) {
LOGGER.debug("Got transaction with null username {}", transaction);
skipExcludedUserName = false;
}
else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUserName())) {
LOGGER.trace("Skipping transaction with excluded username {}", transaction);
skipExcludedUserName = true;
}
else {
skipExcludedUserName = false;
}
final Scn smallestScn = transactionCache.getMinimumScn();
metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
@ -241,18 +263,17 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
counters.commitCount++;
Instant start = Instant.now();
getReconciliation().reconcile(transaction);
int numEvents = transaction.getEvents().size();
LOGGER.trace("Commit: (smallest SCN {}) {}", smallestScn, row);
LOGGER.trace("Transaction {} has {} events", transactionId, numEvents);
for (LogMinerEvent event : transaction.getEvents()) {
if (!context.isRunning()) {
return;
}
BlockingConsumer<LogMinerEvent> delegate = new BlockingConsumer<LogMinerEvent>() {
private int numEvents = getTransactionEventCount(transaction);
@Override
public void accept(LogMinerEvent event) throws InterruptedException {
// Update SCN in offset context only if processed SCN less than SCN of other transactions
if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) {
offsetContext.setScn(event.getScn());
@ -267,7 +288,6 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
offsetContext.setCommitScn(commitScn);
}
// after reconciliation all events should be DML
final DmlEvent dmlEvent = (DmlEvent) event;
if (!skipExcludedUserName) {
dispatcher.dispatchDataChangeEvent(event.getTableId(),
@ -281,6 +301,20 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
Clock.system()));
}
}
};
int eventCount = 0;
try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, getConfig(), getSchema())) {
for (LogMinerEvent event : transaction.getEvents()) {
if (!context.isRunning()) {
return;
}
eventCount++;
LOGGER.trace("Dispatching event {} {}", eventCount, event.getEventType());
commitConsumer.accept(event);
}
}
lastCommittedScn = Scn.valueOf(commitScn.longValue());
if (!transaction.getEvents().isEmpty() && !skipExcludedUserName) {
@ -302,7 +336,7 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(transactionCache.size());
metrics.incrementCommittedDmlCount(transaction.getEvents().size());
metrics.incrementCommittedDmlCount(eventCount);
metrics.setCommittedScn(commitScn);
metrics.setOffsetScn(offsetContext.getScn());
metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
@ -310,7 +344,7 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
@Override
protected void handleRollback(LogMinerEventRow row) {
final Transaction transaction = transactionCache.get(row.getTransactionId());
final MemoryTransaction transaction = transactionCache.get(row.getTransactionId());
if (transaction != null) {
transactionCache.remove(row.getTransactionId());
abandonedTransactionsCache.remove(row.getTransactionId());
@ -332,6 +366,33 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
}
}
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (isTransactionIdAllowed(transactionId)) {
MemoryTransaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not in cache for DML, creating.", transactionId);
transaction = createTransaction(row);
getTransactionCache().put(transactionId, transaction);
}
int eventId = transaction.getNextEventId();
if (transaction.getEvents().size() <= eventId) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId);
transaction.getEvents().add(eventSupplier.get());
metrics.calculateLagMetrics(row.getChangeTime());
}
metrics.setActiveTransactions(getTransactionCache().size());
}
}
@Override
protected int getTransactionEventCount(MemoryTransaction transaction) {
return transaction.getEvents().size();
}
private PreparedStatement createQueryStatement() throws SQLException {
final String query = LogMinerQueryBuilder.build(getConfig(), getSchema());
return jdbcConnection.connection().prepareStatement(query,
@ -402,4 +463,5 @@ protected Optional<Scn> getLastScnToAbandon(OracleConnection connection, Scn off
return Optional.of(offsetScn);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.memory;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.AbstractTransaction;
/**
* A concrete implementation of a {@link AbstractTransaction} for the JVM heap memory processor.
*
* @author Chris Cranford
*/
public class MemoryTransaction extends AbstractTransaction {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTransaction.class);
private int numberOfEvents;
private List<LogMinerEvent> events;
public MemoryTransaction(String transactionId, Scn startScn, Instant changeTime, String userName) {
super(transactionId, startScn, changeTime, userName);
this.events = new ArrayList<>();
started();
}
@Override
public int getNumberOfEvents() {
return numberOfEvents;
}
@Override
public int getNextEventId() {
return numberOfEvents++;
}
@Override
public void started() {
numberOfEvents = 0;
}
public List<LogMinerEvent> getEvents() {
return events;
}
public void removeEventWithRowId(String rowId) {
events.removeIf(event -> {
if (event.getRowId().equals(rowId)) {
LOGGER.trace("Undo applied for event {}.", event);
return true;
}
return false;
});
}
@Override
public String toString() {
return "MemoryTransaction{" +
"numberOfEvents=" + numberOfEvents +
"} " + super.toString();
}
}

View File

@ -10,7 +10,6 @@
import java.util.Map;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
/**
@ -18,22 +17,22 @@
*
* @author Chris Cranford
*/
public class MemoryTransactionCache implements TransactionCache<Map.Entry<String, Transaction>> {
public class MemoryTransactionCache implements TransactionCache<MemoryTransaction, Map.Entry<String, MemoryTransaction>> {
public final Map<String, Transaction> cache = new HashMap<>();
public final Map<String, MemoryTransaction> cache = new HashMap<>();
@Override
public Transaction get(String transactionId) {
public MemoryTransaction get(String transactionId) {
return cache.get(transactionId);
}
@Override
public void put(String transactionId, Transaction transaction) {
public void put(String transactionId, MemoryTransaction transaction) {
cache.put(transactionId, transaction);
}
@Override
public Transaction remove(String transactionId) {
public MemoryTransaction remove(String transactionId) {
return cache.remove(transactionId);
}
@ -53,14 +52,14 @@ public boolean isEmpty() {
}
@Override
public Iterator<Map.Entry<String, Transaction>> iterator() {
public Iterator<Map.Entry<String, MemoryTransaction>> iterator() {
return cache.entrySet().iterator();
}
@Override
public Scn getMinimumScn() {
return cache.values().stream()
.map(Transaction::getStartScn)
.map(MemoryTransaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL);
}

View File

@ -13,6 +13,7 @@ log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
#log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG
log4j.logger.io.debezium.core=DEBUG
log4j.logger.io.debezium.connector.oracle=DEBUG
log4j.logger.io.debezium.connector.oracle.logminer.processor=TRACE
# Avoid the fallback property spam
log4j.logger.io.debezium.config.Configuration=ERROR