DBZ-3365 Allow building Oracle connector on CI

This commit is contained in:
Chris Cranford 2021-03-30 14:29:16 -04:00 committed by Jiri Pechanec
parent 83b4a47f99
commit 0f35b0455d
22 changed files with 518 additions and 173 deletions

52
.github/workflows/oracle-workflow.yml vendored Normal file
View File

@ -0,0 +1,52 @@
name: Build Oracle Connector
on:
push:
paths:
- 'support/checkstyle/**'
- 'support/ide-configs/**'
- 'support/revapi/**'
- 'debezium-api/**'
- 'debezium-ddl-parser/**'
- 'debezium-assembly-descriptors/**'
- 'debezium-core/**'
- 'debezium-embedded/**'
- 'debezium-connector-oracle/**'
- 'debezium-microbenchmark/**'
- 'debezium-e2e-benchmark/**'
- 'debezium-scripting/**'
- 'debezium-parent/pom.xml'
- 'debezium-bom/pom.xml'
- 'pom.xml'
pull_request:
paths:
- 'support/checkstyle/**'
- 'support/ide-configs/**'
- 'support/revapi/**'
- 'debezium-api/**'
- 'debezium-ddl-parser/**'
- 'debezium-assembly-descriptors/**'
- 'debezium-core/**'
- 'debezium-embedded/**'
- 'debezium-connector-oracle/**'
- 'debezium-microbenchmark/**'
- 'debezium-e2e-benchmark/**'
- 'debezium-scripting/**'
- 'debezium-parent/pom.xml'
- 'debezium-bom/pom.xml'
- 'pom.xml'
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Cache local Maven repository
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Check changes in Debezium Connector Oracle LogMiner
run: mvn clean install -B -pl debezium-connector-oracle -am -Poracle,oracle-ci,-xstream-dependency -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120

View File

@ -35,11 +35,6 @@
<artifactId>ojdbc8</artifactId> <artifactId>ojdbc8</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.oracle.instantclient</groupId>
<artifactId>xstreams</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.antlr</groupId> <groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId> <artifactId>antlr4-runtime</artifactId>
@ -180,13 +175,79 @@
Define several useful profiles Define several useful profiles
--> -->
<profiles> <profiles>
<profile>
<id>xstream-dependency</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.oracle.instantclient</groupId>
<artifactId>xstreams</artifactId>
</dependency>
</dependencies>
</profile>
<profile> <profile>
<id>assembly</id> <id>assembly</id>
<activation> <activation>
<activeByDefault>false</activeByDefault> <activeByDefault>false</activeByDefault>
</activation> </activation>
<dependencies>
<dependency>
<groupId>com.oracle.instantclient</groupId>
<artifactId>xstreams</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-assembly-descriptors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptorRefs>
<descriptorRef>connector-distribution</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<!-- Allows CI to build Oracle without the xstream.jar dependency -->
<id>oracle-ci</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration combine.self="override">
<excludes>
<exclude>**/io/debezium/connector/oracle/xstream/**</exclude>
</excludes>
</configuration>
</plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
@ -223,6 +284,12 @@
<properties> <properties>
<adapter.name>xstream</adapter.name> <adapter.name>xstream</adapter.name>
</properties> </properties>
<dependencies>
<dependency>
<groupId>com.oracle.instantclient</groupId>
<artifactId>xstreams</artifactId>
</dependency>
</dependencies>
</profile> </profile>
</profiles> </profiles>
</project> </project>

View File

@ -0,0 +1,24 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import io.debezium.document.Document;
/**
* Abstract implementation of the {@link StreamingAdapter} for which all streaming adapters are derived.
*
* @author Chris Cranford
*/
public abstract class AbstractStreamingAdapter implements StreamingAdapter {
protected Scn resolveScn(Document document) {
final String scn = document.getString(SourceInfo.SCN_KEY);
if (scn == null) {
Long scnValue = document.getLong(SourceInfo.SCN_KEY);
return Scn.valueOf(scnValue == null ? 0 : scnValue);
}
return Scn.valueOf(scn);
}
}

View File

@ -6,8 +6,6 @@
package io.debezium.connector.oracle; package io.debezium.connector.oracle;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.xstream.XstreamStreamingChangeEventSource;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
@ -53,20 +51,9 @@ public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offs
@Override @Override
public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext offsetContext) { public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext offsetContext) {
OracleConnectorConfig.ConnectorAdapter adapter = configuration.getAdapter(); return configuration.getAdapter().getSource(
if (adapter == OracleConnectorConfig.ConnectorAdapter.XSTREAM) {
return new XstreamStreamingChangeEventSource(
configuration, configuration,
(OracleOffsetContext) offsetContext, offsetContext,
jdbcConnection,
dispatcher,
errorHandler,
clock,
schema);
}
return new LogMinerStreamingChangeEventSource(
configuration,
(OracleOffsetContext) offsetContext,
jdbcConnection, jdbcConnection,
dispatcher, dispatcher,
errorHandler, errorHandler,

View File

@ -27,6 +27,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter; import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
@ -341,6 +342,22 @@ public boolean getTablenameCaseInsensitivity(OracleConnectorConfig connectorConf
return configValue.orElse(getOracleVersion().getMajor() == 11); return configValue.orElse(getOracleVersion().getMajor() == 11);
} }
/**
* Get the current, most recent system change number.
*
* @return the current system change number
* @throws SQLException if an exception occurred
* @throws IllegalStateException if the query does not return at least one row
*/
public Scn getCurrentScn() throws SQLException {
return queryAndMap("SELECT CURRENT_SCN FROM V$DATABASE", (rs) -> {
if (rs.next()) {
return Scn.valueOf(rs.getString(1));
}
throw new IllegalStateException("Could not get SCN");
});
}
public OracleConnection executeLegacy(String... sqlStatements) throws SQLException { public OracleConnection executeLegacy(String... sqlStatements) throws SQLException {
return executeLegacy(statement -> { return executeLegacy(statement -> {
for (String sqlStatement : sqlStatements) { for (String sqlStatement : sqlStatements) {

View File

@ -19,6 +19,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition; import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
@ -30,8 +31,6 @@
import io.debezium.connector.oracle.logminer.HistoryRecorder; import io.debezium.connector.oracle.logminer.HistoryRecorder;
import io.debezium.connector.oracle.logminer.NeverHistoryRecorder; import io.debezium.connector.oracle.logminer.NeverHistoryRecorder;
import io.debezium.connector.oracle.logminer.SqlUtils; import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig; import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
@ -347,6 +346,7 @@ public static ConfigDef configDef() {
private final HistoryRecorder logMiningHistoryRecorder; private final HistoryRecorder logMiningHistoryRecorder;
private final Configuration jdbcConfig; private final Configuration jdbcConfig;
private final ConnectorAdapter connectorAdapter; private final ConnectorAdapter connectorAdapter;
private final StreamingAdapter streamingAdapter;
private final String snapshotEnhancementToken; private final String snapshotEnhancementToken;
// LogMiner options // LogMiner options
@ -379,9 +379,17 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningHistoryRecorder = resolveLogMiningHistoryRecorder(config); this.logMiningHistoryRecorder = resolveLogMiningHistoryRecorder(config);
this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true); this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true);
this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN); this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN);
this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER));
final String adapterClassName = this.connectorAdapter.getImplementationName();
try {
this.streamingAdapter = (StreamingAdapter) Class.forName(adapterClassName).getDeclaredConstructor().newInstance();
}
catch (Exception e) {
throw new DebeziumException("Failed to load connector adapter implementation: " + adapterClassName, e);
}
// LogMiner // LogMiner
this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER));
this.logMiningStrategy = LogMiningStrategy.parse(config.getString(LOG_MINING_STRATEGY)); this.logMiningStrategy = LogMiningStrategy.parse(config.getString(LOG_MINING_STRATEGY));
this.logMiningHistoryRetentionHours = config.getLong(LOG_MINING_HISTORY_RETENTION); this.logMiningHistoryRetentionHours = config.getLong(LOG_MINING_HISTORY_RETENTION);
this.racNodes = Strings.setOf(config.getString(RAC_NODES), String::new); this.racNodes = Strings.setOf(config.getString(RAC_NODES), String::new);
@ -447,38 +455,7 @@ public String getOracleVersion() {
@Override @Override
protected HistoryRecordComparator getHistoryRecordComparator() { protected HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() { return getAdapter().getHistoryRecordComparator();
@Override
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
Scn recordedScn;
Scn desiredScn;
if (getAdapter() == OracleConnectorConfig.ConnectorAdapter.XSTREAM) {
final LcrPosition recordedPosition = LcrPosition.valueOf(recorded.getString(SourceInfo.LCR_POSITION_KEY));
final LcrPosition desiredPosition = LcrPosition.valueOf(desired.getString(SourceInfo.LCR_POSITION_KEY));
recordedScn = recordedPosition != null ? recordedPosition.getScn() : resolveScn(recorded);
desiredScn = desiredPosition != null ? desiredPosition.getScn() : resolveScn(desired);
return (recordedPosition != null && desiredPosition != null)
? recordedPosition.compareTo(desiredPosition) < 1
: recordedScn.compareTo(desiredScn) < 1;
}
else {
recordedScn = resolveScn(recorded);
desiredScn = resolveScn(desired);
return recordedScn.compareTo(desiredScn) < 1;
}
}
private Scn resolveScn(Document document) {
// prioritize reading scn as string and if not found, fallback to long data types
final String scn = document.getString(SourceInfo.SCN_KEY);
if (scn == null) {
Long scnValue = document.getLong(SourceInfo.SCN_KEY);
Scn.valueOf(scnValue == null ? 0 : scnValue);
}
return Scn.valueOf(scn);
}
};
} }
/** /**
@ -566,6 +543,11 @@ public enum ConnectorAdapter implements EnumeratedValue {
public String getConnectionUrl() { public String getConnectionUrl() {
return "jdbc:oracle:oci:@${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}/${" + JdbcConfiguration.DATABASE + "}"; return "jdbc:oracle:oci:@${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}/${" + JdbcConfiguration.DATABASE + "}";
} }
@Override
public String getImplementationName() {
return "io.debezium.connector.oracle.xstream.XStreamAdapter";
}
}, },
/** /**
@ -576,9 +558,15 @@ public String getConnectionUrl() {
public String getConnectionUrl() { public String getConnectionUrl() {
return "jdbc:oracle:thin:@${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}/${" + JdbcConfiguration.DATABASE + "}"; return "jdbc:oracle:thin:@${" + JdbcConfiguration.HOSTNAME + "}:${" + JdbcConfiguration.PORT + "}/${" + JdbcConfiguration.DATABASE + "}";
} }
@Override
public String getImplementationName() {
return "io.debezium.connector.oracle.logminer.LogMinerAdapter";
}
}; };
public abstract String getConnectionUrl(); public abstract String getConnectionUrl();
public abstract String getImplementationName();
private final String value; private final String value;
@ -755,10 +743,10 @@ public String getContextName() {
} }
/** /**
* @return connection adapter * @return the streaming adapter implementation
*/ */
public ConnectorAdapter getAdapter() { public StreamingAdapter getAdapter() {
return connectorAdapter; return streamingAdapter;
} }
/** /**

View File

@ -54,9 +54,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage(); this.schema.initializeStorage();
String adapterString = config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER); OffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader(connectorConfig));
OracleConnectorConfig.ConnectorAdapter adapter = OracleConnectorConfig.ConnectorAdapter.parse(adapterString);
OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig, adapter));
if (previousOffset != null) { if (previousOffset != null) {
schema.recover(previousOffset); schema.recover(previousOffset);

View File

@ -14,7 +14,6 @@
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
@ -22,8 +21,8 @@
public class OracleOffsetContext implements OffsetContext { public class OracleOffsetContext implements OffsetContext {
private static final String SERVER_PARTITION_KEY = "server"; public static final String SERVER_PARTITION_KEY = "server";
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
private final Schema sourceInfoSchema; private final Schema sourceInfoSchema;
private final Map<String, String> partition; private final Map<String, String> partition;
@ -36,13 +35,13 @@ public class OracleOffsetContext implements OffsetContext {
*/ */
private boolean snapshotCompleted; private boolean snapshotCompleted;
public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Scn commitScn, LcrPosition lcrPosition, public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Scn commitScn, String lcrPosition,
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) { boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) {
this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext); this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext);
sourceInfo.setCommitScn(commitScn); sourceInfo.setCommitScn(commitScn);
} }
private OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, LcrPosition lcrPosition, public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, String lcrPosition,
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) { boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
@ -66,7 +65,7 @@ public static class Builder {
private OracleConnectorConfig connectorConfig; private OracleConnectorConfig connectorConfig;
private Scn scn; private Scn scn;
private LcrPosition lcrPosition; private String lcrPosition;
private boolean snapshot; private boolean snapshot;
private boolean snapshotCompleted; private boolean snapshotCompleted;
private TransactionContext transactionContext; private TransactionContext transactionContext;
@ -81,7 +80,7 @@ public Builder scn(Scn scn) {
return this; return this;
} }
public Builder lcrPosition(LcrPosition lcrPosition) { public Builder lcrPosition(String lcrPosition) {
this.lcrPosition = lcrPosition; this.lcrPosition = lcrPosition;
return this; return this;
} }
@ -130,7 +129,7 @@ public static Builder create() {
else { else {
final Map<String, Object> offset = new HashMap<>(); final Map<String, Object> offset = new HashMap<>();
if (sourceInfo.getLcrPosition() != null) { if (sourceInfo.getLcrPosition() != null) {
offset.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); offset.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition());
} }
else { else {
final Scn scn = sourceInfo.getScn(); final Scn scn = sourceInfo.getScn();
@ -168,11 +167,11 @@ public Scn getCommitScn() {
return sourceInfo.getCommitScn(); return sourceInfo.getCommitScn();
} }
public void setLcrPosition(LcrPosition lcrPosition) { public void setLcrPosition(String lcrPosition) {
sourceInfo.setLcrPosition(lcrPosition); sourceInfo.setLcrPosition(lcrPosition);
} }
public LcrPosition getLcrPosition() { public String getLcrPosition() {
return sourceInfo.getLcrPosition(); return sourceInfo.getLcrPosition();
} }
@ -239,56 +238,6 @@ public TransactionContext getTransactionContext() {
return transactionContext; return transactionContext;
} }
public static class Loader implements OffsetContext.Loader {
private final OracleConnectorConfig connectorConfig;
private final OracleConnectorConfig.ConnectorAdapter adapter;
// todo resolve adapter from the config rather than passing it
public Loader(OracleConnectorConfig connectorConfig, OracleConnectorConfig.ConnectorAdapter adapter) {
this.connectorConfig = connectorConfig;
this.adapter = adapter;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
@Override
public OffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY));
Scn scn;
if (adapter == OracleConnectorConfig.ConnectorAdapter.LOG_MINER) {
scn = getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY);
Scn commitScn = getScnFromOffsetMapByKey(offset, SourceInfo.COMMIT_SCN_KEY);
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset));
}
else {
LcrPosition lcrPosition = LcrPosition.valueOf((String) offset.get(SourceInfo.LCR_POSITION_KEY));
scn = (lcrPosition != null ? lcrPosition.getScn() : getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY));
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset));
}
}
public static Scn getScnFromOffset(Map<String, ?> offset, LcrPosition lcrPosition) {
if (lcrPosition != null) {
return lcrPosition.getScn();
}
// Prioritize string-based SCN key over the numeric-based SCN key
Object scn = offset.get(SourceInfo.SCN_KEY);
if (scn instanceof String) {
return Scn.valueOf((String) scn);
}
else if (scn != null) {
return Scn.valueOf((Long) scn);
}
return null;
}
}
/** /**
* Helper method to resolve a {@link Scn} by key from the offset map. * Helper method to resolve a {@link Scn} by key from the offset map.
* *

View File

@ -128,19 +128,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext ctx) throws Exc
} }
private Scn getCurrentScn(SnapshotContext ctx) throws SQLException { private Scn getCurrentScn(SnapshotContext ctx) throws SQLException {
if (connectorConfig.getAdapter().equals(OracleConnectorConfig.ConnectorAdapter.LOG_MINER)) { return jdbcConnection.getCurrentScn();
return LogMinerHelper.getCurrentScn(jdbcConnection);
}
try (Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE")) {
if (!rs.next()) {
throw new IllegalStateException("Couldn't get SCN");
}
return Scn.valueOf(rs.getString(1));
}
} }
/** /**

View File

@ -45,7 +45,7 @@ public Struct struct(SourceInfo sourceInfo) {
.put(SourceInfo.SCN_KEY, scn); .put(SourceInfo.SCN_KEY, scn);
if (sourceInfo.getLcrPosition() != null) { if (sourceInfo.getLcrPosition() != null) {
ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition());
} }
if (commitScn != null) { if (commitScn != null) {
ret.put(SourceInfo.COMMIT_SCN_KEY, commitScn); ret.put(SourceInfo.COMMIT_SCN_KEY, commitScn);

View File

@ -9,7 +9,6 @@
import io.debezium.annotation.NotThreadSafe; import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.common.BaseSourceInfo; import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
@NotThreadSafe @NotThreadSafe
@ -23,7 +22,7 @@ public class SourceInfo extends BaseSourceInfo {
private Scn scn; private Scn scn;
private Scn commitScn; private Scn commitScn;
private LcrPosition lcrPosition; private String lcrPosition;
private String transactionId; private String transactionId;
private Instant sourceTime; private Instant sourceTime;
private TableId tableId; private TableId tableId;
@ -48,11 +47,11 @@ public void setCommitScn(Scn commitScn) {
this.commitScn = commitScn; this.commitScn = commitScn;
} }
public LcrPosition getLcrPosition() { public String getLcrPosition() {
return lcrPosition; return lcrPosition;
} }
public void setLcrPosition(LcrPosition lcrPosition) { public void setLcrPosition(String lcrPosition) {
this.lcrPosition = lcrPosition; this.lcrPosition = lcrPosition;
} }

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import io.debezium.config.Configuration;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
/**
* Contract that defines unique behavior for each possible {@code connection.adapter}.
*
* @author Chris Cranford
*/
public interface StreamingAdapter {
String getType();
HistoryRecordComparator getHistoryRecordComparator();
OffsetContext.Loader getOffsetContextLoader(OracleConnectorConfig connectorConfig);
StreamingChangeEventSource getSource(OracleConnectorConfig connectorConfig, OffsetContext offsetContext,
OracleConnection connection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
OracleDatabaseSchema schema, OracleTaskContext taskContext, Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics);
}

View File

@ -0,0 +1,75 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.AbstractStreamingAdapter;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.document.Document;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
/**
* @author Chris Cranford
*/
public class LogMinerAdapter extends AbstractStreamingAdapter {
private static final String TYPE = "logminer";
@Override
public String getType() {
return TYPE;
}
@Override
public HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() {
@Override
protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return resolveScn(recorded).compareTo(resolveScn(desired)) < 1;
}
};
}
@Override
public OffsetContext.Loader getOffsetContextLoader(OracleConnectorConfig connectorConfig) {
return new LogMinerOracleOffsetContextLoader(connectorConfig);
}
@Override
public StreamingChangeEventSource getSource(OracleConnectorConfig connectorConfig,
OffsetContext offsetContext,
OracleConnection connection,
EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
OracleDatabaseSchema schema,
OracleTaskContext taskContext,
Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
return new LogMinerStreamingChangeEventSource(
connectorConfig,
(OracleOffsetContext) offsetContext,
connection,
dispatcher,
errorHandler,
clock,
schema,
taskContext,
jdbcConfig,
streamingMetrics);
}
}

View File

@ -97,25 +97,6 @@ static void buildDataDictionary(OracleConnection connection) throws SQLException
executeCallableStatement(connection, SqlUtils.BUILD_DICTIONARY); executeCallableStatement(connection, SqlUtils.BUILD_DICTIONARY);
} }
/**
* This method returns current SCN from the database
*
* @param connection container level database connection
* @return current SCN
* @throws SQLException if anything unexpected happens
*/
public static Scn getCurrentScn(OracleConnection connection) throws SQLException {
try (Statement statement = connection.connection(false).createStatement();
ResultSet rs = statement.executeQuery(SqlUtils.currentScnQuery())) {
if (!rs.next()) {
throw new IllegalStateException("Couldn't get SCN");
}
return Scn.valueOf(rs.getString(1));
}
}
static void createFlushTable(OracleConnection connection) throws SQLException { static void createFlushTable(OracleConnection connection) throws SQLException {
String tableExists = (String) getSingleResult(connection, SqlUtils.tableExistsQuery(SqlUtils.LOGMNR_FLUSH_TABLE), DATATYPE.STRING); String tableExists = (String) getSingleResult(connection, SqlUtils.tableExistsQuery(SqlUtils.LOGMNR_FLUSH_TABLE), DATATYPE.STRING);
if (tableExists == null) { if (tableExists == null) {
@ -143,7 +124,7 @@ static void createFlushTable(OracleConnection connection) throws SQLException {
* @throws SQLException if anything unexpected happens * @throws SQLException if anything unexpected happens
*/ */
static Scn getEndScn(OracleConnection connection, Scn startScn, OracleStreamingChangeEventSourceMetrics streamingMetrics, int defaultBatchSize) throws SQLException { static Scn getEndScn(OracleConnection connection, Scn startScn, OracleStreamingChangeEventSourceMetrics streamingMetrics, int defaultBatchSize) throws SQLException {
Scn currentScn = getCurrentScn(connection); Scn currentScn = connection.getCurrentScn();
streamingMetrics.setCurrentScn(currentScn); streamingMetrics.setCurrentScn(currentScn);
Scn topScnToMine = startScn.add(Scn.valueOf(streamingMetrics.getBatchSize())); Scn topScnToMine = startScn.add(Scn.valueOf(streamingMetrics.getBatchSize()));
@ -183,7 +164,7 @@ static Scn getEndScn(OracleConnection connection, Scn startScn, OracleStreamingC
static void flushLogWriter(OracleConnection connection, JdbcConfiguration config, static void flushLogWriter(OracleConnection connection, JdbcConfiguration config,
boolean isRac, Set<String> racHosts) boolean isRac, Set<String> racHosts)
throws SQLException { throws SQLException {
Scn currentScn = getCurrentScn(connection); Scn currentScn = connection.getCurrentScn();
if (isRac) { if (isRac) {
flushRacLogWriters(currentScn, config, racHosts); flushRacLogWriters(currentScn, config, racHosts);
} }

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import java.util.Collections;
import java.util.Map;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
/**
* @author Chris Cranford
*/
public class LogMinerOracleOffsetContextLoader implements OffsetContext.Loader {
private final OracleConnectorConfig connectorConfig;
public LogMinerOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(OracleOffsetContext.SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
@Override
public OffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY));
Scn scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY);
Scn commitScn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.COMMIT_SCN_KEY);
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset));
}
}

View File

@ -63,9 +63,10 @@ public void processLCR(LCR lcr) throws StreamsException {
final LcrPosition lcrPosition = new LcrPosition(lcr.getPosition()); final LcrPosition lcrPosition = new LcrPosition(lcr.getPosition());
// After a restart it may happen we get the event with the last processed LCR again // After a restart it may happen we get the event with the last processed LCR again
if (lcrPosition.compareTo(offsetContext.getLcrPosition()) <= 0) { LcrPosition offsetLcrPosition = LcrPosition.valueOf(offsetContext.getLcrPosition());
if (lcrPosition.compareTo(offsetLcrPosition) <= 0) {
if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
final LcrPosition recPosition = offsetContext.getLcrPosition(); final LcrPosition recPosition = offsetLcrPosition;
LOGGER.debug("Ignoring change event with already processed SCN/LCR Position {}/{}, last recorded {}/{}", LOGGER.debug("Ignoring change event with already processed SCN/LCR Position {}/{}, last recorded {}/{}",
lcrPosition, lcrPosition,
lcrPosition.getScn(), lcrPosition.getScn(),
@ -76,7 +77,7 @@ public void processLCR(LCR lcr) throws StreamsException {
} }
offsetContext.setScn(lcrPosition.getScn()); offsetContext.setScn(lcrPosition.getScn());
offsetContext.setLcrPosition(lcrPosition); offsetContext.setLcrPosition(lcrPosition.toString());
offsetContext.setTransactionId(lcr.getTransactionId()); offsetContext.setTransactionId(lcr.getTransactionId());
offsetContext.setSourceTime(lcr.getSourceTime().timestampValue().toInstant()); offsetContext.setSourceTime(lcr.getSourceTime().timestampValue().toInstant());
offsetContext.setTableId(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName())); offsetContext.setTableId(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()));

View File

@ -0,0 +1,83 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.xstream;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.AbstractStreamingAdapter;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.document.Document;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Clock;
/**
* The streaming adapter implementation for Oracle XStream.
*
* @author Chris Cranford
*/
public class XStreamAdapter extends AbstractStreamingAdapter {
private static final String TYPE = "xstream";
@Override
public String getType() {
return TYPE;
}
@Override
public HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() {
@Override
public boolean isPositionAtOrBefore(Document recorded, Document desired) {
final LcrPosition recordedPosition = LcrPosition.valueOf(recorded.getString(SourceInfo.LCR_POSITION_KEY));
final LcrPosition desiredPosition = LcrPosition.valueOf(desired.getString(SourceInfo.LCR_POSITION_KEY));
final Scn recordedScn = recordedPosition != null ? recordedPosition.getScn() : resolveScn(recorded);
final Scn desiredScn = desiredPosition != null ? desiredPosition.getScn() : resolveScn(desired);
if (recordedPosition != null && desiredPosition != null) {
return recordedPosition.compareTo(desiredPosition) < 1;
}
return recordedScn.compareTo(desiredScn) < 1;
}
};
}
@Override
public OffsetContext.Loader getOffsetContextLoader(OracleConnectorConfig connectorConfig) {
return new XStreamOracleOffsetContextLoader(connectorConfig);
}
@Override
public StreamingChangeEventSource getSource(OracleConnectorConfig connectorConfig,
OffsetContext offsetContext,
OracleConnection connection,
EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
OracleDatabaseSchema schema,
OracleTaskContext taskContext,
Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
return new XstreamStreamingChangeEventSource(
connectorConfig,
(OracleOffsetContext) offsetContext,
connection,
dispatcher,
errorHandler,
clock,
schema);
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.xstream;
import java.util.Collections;
import java.util.Map;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
/**
* The {@link OffsetContext} loader implementation for the Oracle XStream adapter
*
* @author Chris Cranford
*/
public class XStreamOracleOffsetContextLoader implements OffsetContext.Loader {
private final OracleConnectorConfig connectorConfig;
public XStreamOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(OracleOffsetContext.SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
@Override
public OffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY));
String lcrPosition = (String) offset.get(SourceInfo.LCR_POSITION_KEY);
final Scn scn;
if (lcrPosition != null) {
scn = LcrPosition.valueOf(lcrPosition).getScn();
}
else {
scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY);
}
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset));
}
}

View File

@ -78,9 +78,14 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
try (OracleConnection xsConnection = new OracleConnection(jdbcConnection.config(), () -> getClass().getClassLoader())) { try (OracleConnection xsConnection = new OracleConnection(jdbcConnection.config(), () -> getClass().getClassLoader())) {
try { try {
// 1. connect // 1. connect
final byte[] startPosition = offsetContext.getLcrPosition() != null final byte[] startPosition;
? offsetContext.getLcrPosition().getRawPosition() String lcrPosition = offsetContext.getLcrPosition();
: convertScnToPosition(offsetContext.getScn()); if (lcrPosition != null) {
startPosition = LcrPosition.valueOf(lcrPosition).getRawPosition();
}
else {
startPosition = convertScnToPosition(offsetContext.getScn());
}
xsOut = XStreamOut.attach((oracle.jdbc.OracleConnection) xsConnection.connection(), xStreamServerName, xsOut = XStreamOut.attach((oracle.jdbc.OracleConnection) xsConnection.connection(), xStreamServerName,
startPosition, 1, 1, XStreamOut.DEFAULT_MODE); startPosition, 1, 1, XStreamOut.DEFAULT_MODE);

View File

@ -72,7 +72,7 @@ public void before() throws SQLException {
@Test @Test
public void shouldAddRightArchivedRedoFiles() throws Exception { public void shouldAddRightArchivedRedoFiles() throws Exception {
// case 1 : oldest scn = current scn // case 1 : oldest scn = current scn
Scn currentScn = LogMinerHelper.getCurrentScn(conn); Scn currentScn = conn.getCurrentScn();
Map<String, String> archivedRedoFiles = LogMinerHelper.getMap(conn, SqlUtils.archiveLogsQuery(currentScn, Duration.ofHours(0L)), "-1"); Map<String, String> archivedRedoFiles = LogMinerHelper.getMap(conn, SqlUtils.archiveLogsQuery(currentScn, Duration.ofHours(0L)), "-1");
assertThat(archivedRedoFiles.size() == 0).isTrue(); assertThat(archivedRedoFiles.size() == 0).isTrue();

View File

@ -15,6 +15,7 @@
import io.debezium.connector.oracle.util.TestHelper; import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
import io.debezium.pipeline.spi.OffsetContext;
/** /**
* Unit test that validates the behavior of the {@link OracleOffsetContext} and its friends. * Unit test that validates the behavior of the {@link OracleOffsetContext} and its friends.
@ -24,12 +25,12 @@
public class OracleOffsetContextTest { public class OracleOffsetContextTest {
private OracleConnectorConfig connectorConfig; private OracleConnectorConfig connectorConfig;
private OracleOffsetContext.Loader offsetLoader; private OffsetContext.Loader offsetLoader;
@Before @Before
public void beforeEach() throws Exception { public void beforeEach() throws Exception {
this.connectorConfig = new OracleConnectorConfig(TestHelper.defaultConfig().build()); this.connectorConfig = new OracleConnectorConfig(TestHelper.defaultConfig().build());
this.offsetLoader = new OracleOffsetContext.Loader(connectorConfig, TestHelper.adapter()); this.offsetLoader = connectorConfig.getAdapter().getOffsetContextLoader(connectorConfig);
} }
@Test @Test

View File

@ -38,7 +38,6 @@
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule; import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName; import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot.AdapterName;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher;
@ -124,7 +123,7 @@ public void testIsNotEmptyWhenTransactionIsRegistered() {
public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException { public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException {
CountDownLatch commitLatch = new CountDownLatch(1); CountDownLatch commitLatch = new CountDownLatch(1);
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown()); transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> commitLatch.countDown());
offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (LcrPosition) null, false, true, new TransactionContext()); offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (String) null, false, true, new TransactionContext());
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
commitLatch.await(); commitLatch.await();
Thread.sleep(1000); Thread.sleep(1000);
@ -221,7 +220,7 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte
public void testAbandoningOneTransaction() { public void testAbandoningOneTransaction() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> { transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
}); });
offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (LcrPosition) null, false, true, new TransactionContext()); offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (String) null, false, true, new TransactionContext());
transactionalBuffer.abandonLongTransactions(SCN, offsetContext); transactionalBuffer.abandonLongTransactions(SCN, offsetContext);
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true); assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
} }