DBZ-4169 Suggested changes

This commit is contained in:
Chris Cranford 2021-11-15 11:31:40 -05:00 committed by Gunnar Morling
parent 5599938669
commit e0065536f0
4 changed files with 84 additions and 34 deletions

View File

@ -30,8 +30,13 @@
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@ -872,19 +877,80 @@ public static LogMiningStrategy parse(String value, String defaultValue) {
}
public enum LogMiningBufferType implements EnumeratedValue {
MEMORY("memory"),
MEMORY("memory") {
@Override
public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection connection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
return new MemoryLogMinerEventProcessor(context, connectorConfig, connection, dispatcher, partition,
offsetContext, schema, metrics);
}
},
/**
* @deprecated use either {@link #INFINISPAN_EMBEDDED} or {@link #INFINISPAN_REMOTE}.
*/
@Deprecated
INFINISPAN("infinispan"),
INFINISPAN("infinispan") {
@Override
public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection connection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
return new EmbeddedInfinispanLogMinerEventProcessor(context, connectorConfig, connection, dispatcher,
partition, offsetContext, schema, metrics);
}
},
INFINISPAN_EMBEDDED("infinispan_embedded"),
INFINISPAN_REMOTE("infinispan_remote");
INFINISPAN_EMBEDDED("infinispan_embedded") {
@Override
public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection connection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
return new EmbeddedInfinispanLogMinerEventProcessor(context, connectorConfig, connection, dispatcher,
partition, offsetContext, schema, metrics);
}
},
INFINISPAN_REMOTE("infinispan_remote") {
@Override
public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection connection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
return new RemoteInfinispanLogMinerEventProcessor(context, connectorConfig, connection, dispatcher,
partition, offsetContext, schema, metrics);
}
};
private final String value;
/**
* Creates the buffer type's specific processor implementation
*/
public abstract LogMinerEventProcessor createProcessor(ChangeEventSourceContext context, OracleConnectorConfig connectorConfig,
OracleConnection connection, EventDispatcher<TableId> dispatcher, OraclePartition partition,
OracleOffsetContext offsetContext, OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics);
LogMiningBufferType(String value) {
this.value = value;
}

View File

@ -40,9 +40,6 @@
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
@ -209,18 +206,7 @@ private LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
OraclePartition partition,
OracleOffsetContext offsetContext) {
final LogMiningBufferType bufferType = connectorConfig.getLogMiningBufferType();
if (bufferType.isInfinispanEmbedded()) {
return new EmbeddedInfinispanLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher,
partition, offsetContext, schema, streamingMetrics);
}
else if (bufferType.isInfinispan()) {
return new RemoteInfinispanLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher,
partition, offsetContext, schema, streamingMetrics);
}
else {
return new MemoryLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher,
partition, offsetContext, schema, streamingMetrics);
}
return bufferType.createProcessor(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, streamingMetrics);
}
/**

View File

@ -59,6 +59,10 @@ public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor
private final OraclePartition partition;
private final OracleOffsetContext offsetContext;
private final OracleStreamingChangeEventSourceMetrics metrics;
/**
* Cache of transactions, keyed based on the transaction's unique identifier
*/
private final Map<String, MemoryTransaction> transactionCache = new HashMap<>();
private final Map<String, Scn> recentlyCommittedTransactionsCache = new HashMap<>();
private final Set<Scn> schemaChangesCache = new HashSet<>();

View File

@ -490,23 +490,12 @@ If you use the `memory` buffer setting, be sure that the amount of memory that y
ifdef::community[]
[[oracle-event-buffering-infinispan]]
==== Infinispan
The {prodname} Oracle connector can also be configured to use Infinispan as it's cache provider, supporting cache stores both locally with embedded mode or remotely on a server cluster.
The {prodname} Oracle connector can also be configured to use Infinispan as its cache provider, supporting cache stores both locally with embedded mode or remotely on a server cluster.
In order to use Infinispan, the xref:oracle-property-log-mining-buffer-type[`log.mining.buffer.type`] must be configured using either `infinispan_embedded` or `infinispan_remote`.
In order to allow flexibility with Infinispan cache configurations, the connector expects a series of cache configuration properties to be supplied when using Infinispan to buffer event data.
`log.mining.buffer.infinispan.cache.transactions`::
Specifies the XML configuration for the transaction cache.
`log.mining.buffer.infinispan.cache.events`::
Specifies the XML configuration for the event cache.
`log.mining.buffer.infinispan.cache.committed_transactions`::
Specifies the XML configuration for the committed transactions cache.
`log.mining.buffer.infinispan.cache.rollback_transactions`::
Specifies the XML configuration for the rollback transactions cache.
`log.mining.buffer.infinispan.cache.schema_changes`::
Specifies the XML configuration for the schema changes cache.
The contents of each of these configuration properties depend on whether the connector is to integrate with a remote Infinispan cluster or to use the embedded engine.
See the xref:oracle-connector-properties[configuration properties] in the `log.mining.buffer.infinispan.cache` namespace.
The contents of these configuration properties depend on whether the connector is to integrate with a remote Infinispan cluster or to use the embedded engine.
For example, the following illustrates what an embedded configuration would look like for the transaction cache property when using Infinispan in embedded mode:
@ -523,6 +512,11 @@ Looking at the configuration in-depth, the cache is configured to be persistent.
All caches should be configured this way to avoid loss of transaction events across connector restarts if a transaction is in-progress.
Additionally, the location where the cache is kept is defined by the `path` attribute and this should be a shared location accessible all possible runtime environments.
[NOTE]
====
When supplying XML configuration as a JSON connector property value, line breaks must be omitted or replaced with a `\n` character.
====
Another example, the following illustrates the same cache configured with an Infinispan cluster:
[source,xml]
@ -2451,7 +2445,7 @@ See xref:oracle-event-buffering-infinispan[Infinispan event buffering] for more
|[[oracle-property-log-mining-buffer-infinispan-cache-schema-changes]]<<oracle-property-log-mining-buffer-infinispan-cache-schema-changes, `+log.mining.buffer.infinispan.cache.schema_changes+`>>
|No default
|The XML configuration for the Infinispan schema changes cachen.
|The XML configuration for the Infinispan schema changes cache.
|[[oracle-property-log-mining-buffer-drop-on-stop]]<<oracle-property-log-mining-buffer-drop-on-stop, `+log.mining.buffer.drop.on.stop+`>>
|`false`