DBZ-3692 Oracle incremental snapshot support

This commit is contained in:
Chris Cranford 2021-07-22 17:58:09 -04:00 committed by Jiri Pechanec
parent ade2e41aef
commit 393a44a55f
13 changed files with 309 additions and 14 deletions

View File

@ -5,14 +5,19 @@
*/
package io.debezium.connector.oracle;
import java.util.Optional;
import io.debezium.config.Configuration;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory<OraclePartition, OracleOffsetContext> {
@ -60,4 +65,26 @@ public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getStrea
jdbcConfig,
streamingMetrics);
}
@Override
public Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(
OracleOffsetContext offsetContext,
SnapshotProgressListener snapshotProgressListener,
DataChangeEventListener dataChangeEventListener) {
// Incremental snapshots requires a secondary database connection
// This is because Xstream does not allow any work on the connection while the LCR handler may be invoked
// and LogMiner streams results from the CDB$ROOT container but we will need to stream changes from the
// PDB when reading snapshot records.
//
// todo: consider adding a hook so that the connection can be lazily opened & closed when we're done with
// performing any and all incremental snapshot operations.
return Optional.of(new OracleSignalBasedIncrementalSnapshotChangeEventSource(
configuration,
new OracleConnection(jdbcConnection.config(), () -> getClass().getClassLoader()),
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener));
}
}

View File

@ -18,6 +18,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@ -406,6 +407,32 @@ public <T> T singleOptionalValue(String query, ResultSetExtractor<T> extractor)
return queryAndMap(query, rs -> rs.next() ? extractor.apply(rs) : null);
}
@Override
public String buildSelectWithRowLimits(TableId tableId,
int limit,
String projection,
Optional<String> condition,
String orderBy) {
final TableId table = new TableId(null, tableId.schema(), tableId.table());
final StringBuilder sql = new StringBuilder("SELECT ");
sql
.append(projection)
.append(" FROM ");
sql.append(quotedTableIdString(table));
if (condition.isPresent()) {
sql
.append(" WHERE ")
.append(condition.get());
}
sql
.append(" ORDER BY ")
.append(orderBy)
.append(" FETCH NEXT ")
.append(limit)
.append(" ROWS ONLY");
return sql.toString();
}
public static String connectionString(Configuration config) {
return config.getString(URL) != null ? config.getString(URL)
: ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl();

View File

@ -15,6 +15,7 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
@ -30,6 +31,7 @@ public class OracleOffsetContext implements OffsetContext {
private final SourceInfo sourceInfo;
private final TransactionContext transactionContext;
private final IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
/**
* Whether a snapshot has been completed or not.
@ -37,13 +39,15 @@ public class OracleOffsetContext implements OffsetContext {
private boolean snapshotCompleted;
public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Scn commitScn, String lcrPosition,
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) {
this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext);
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext, incrementalSnapshotContext);
sourceInfo.setCommitScn(commitScn);
}
public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, String lcrPosition,
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) {
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
sourceInfo = new SourceInfo(connectorConfig);
@ -52,6 +56,7 @@ public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Strin
sourceInfoSchema = sourceInfo.schema();
this.transactionContext = transactionContext;
this.incrementalSnapshotContext = incrementalSnapshotContext;
this.snapshotCompleted = snapshotCompleted;
if (this.snapshotCompleted) {
@ -70,6 +75,7 @@ public static class Builder {
private boolean snapshot;
private boolean snapshotCompleted;
private TransactionContext transactionContext;
private IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
public Builder logicalName(OracleConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
@ -101,8 +107,13 @@ public Builder transactionContext(TransactionContext transactionContext) {
return this;
}
public Builder incrementalSnapshotContext(IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
this.incrementalSnapshotContext = incrementalSnapshotContext;
return this;
}
OracleOffsetContext build() {
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext);
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext, incrementalSnapshotContext);
}
}
@ -138,7 +149,7 @@ public static Builder create() {
offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : null);
offset.put(SourceInfo.COMMIT_SCN_KEY, commitScn != null ? commitScn.toString() : null);
}
return transactionContext.store(offset);
return incrementalSnapshotContext.store(transactionContext.store(offset));
}
}
@ -249,6 +260,16 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;
}
/**
* Helper method to resolve a {@link Scn} by key from the offset map.
*

View File

@ -0,0 +1,58 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
/**
* @author Chris Cranford
*/
public class OracleSignalBasedIncrementalSnapshotChangeEventSource extends SignalBasedIncrementalSnapshotChangeEventSource<TableId> {
private final String pdbName;
private final JdbcConnection connection;
public OracleSignalBasedIncrementalSnapshotChangeEventSource(CommonConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<TableId> dispatcher,
DatabaseSchema<?> databaseSchema,
Clock clock,
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener);
this.pdbName = ((OracleConnectorConfig) config).getPdbName();
this.connection = jdbcConnection;
}
@Override
protected String getSignalTableName(String dataCollectionId) {
final TableId tableId = TableId.parse(dataCollectionId);
return tableId.schema() + "." + tableId.table();
}
@Override
protected void preReadChunk(IncrementalSnapshotContext<TableId> context) {
if (pdbName != null) {
((OracleConnection) connection).setSessionToPdb(pdbName);
}
}
@Override
protected void postReadChunk(IncrementalSnapshotContext<TableId> context) {
if (pdbName != null) {
((OracleConnection) connection).resetSessionToCdb();
}
}
}

View File

@ -17,6 +17,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
@ -126,6 +127,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext<OracleOffsetCon
.logicalName(connectorConfig)
.scn(currentScn)
.transactionContext(new TransactionContext())
.incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext<>())
.build();
}

View File

@ -11,6 +11,7 @@
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
@ -32,7 +33,8 @@ public OracleOffsetContext load(Map<String, ?> offset) {
Scn scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY);
Scn commitScn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.COMMIT_SCN_KEY);
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset));
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset),
SignalBasedIncrementalSnapshotContext.load(offset));
}
}

View File

@ -11,6 +11,7 @@
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
@ -42,6 +43,7 @@ public OracleOffsetContext load(Map<String, ?> offset) {
scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY);
}
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset));
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset),
SignalBasedIncrementalSnapshotContext.load(offset));
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipTestRule;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Testing;
/**
* Incremental Snapshots tests for the Oracle connector.
*
* @author Chris Cranford
*/
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<OracleConnector> {
private OracleConnection connection;
@Rule
public SkipTestRule skipRule = new SkipTestRule();
@Before
public void before() throws Exception {
connection = TestHelper.testConnection();
TestHelper.dropTable(connection, "a");
connection.execute("CREATE TABLE a (pk numeric(9,0) primary key, aa numeric(9,0))");
// todo: creates signal table in the PDB, do we want it to be in the CDB?
TestHelper.dropTable(connection, "debezium_signal");
connection.execute("CREATE TABLE debezium_signal (id varchar2(64), type varchar2(32), data varchar2(2048))");
connection.execute("GRANT INSERT on debezium_signal to " + TestHelper.getConnectorUserName());
TestHelper.streamTable(connection, "debezium_signal");
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws Exception {
stopConnector();
if (connection != null) {
TestHelper.dropTable(connection, "a");
TestHelper.dropTable(connection, "debezium_signal");
connection.close();
}
}
@Override
protected void waitForConnectorToStart() {
super.waitForConnectorToStart();
try {
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
protected void populateTable() throws SQLException {
super.populateTable();
TestHelper.streamTable(connection, "a");
}
@Override
protected Class<OracleConnector> connectorClass() {
return OracleConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected String topicName() {
return "server1.DEBEZIUM.A";
}
@Override
protected String tableName() {
return "DEBEZIUM.A";
}
@Override
protected String tableDataCollectionId() {
return "ORCLPDB1.DEBEZIUM.A";
}
@Override
protected String signalTableName() {
return "DEBEZIUM.DEBEZIUM_SIGNAL";
}
@Override
protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, "ORCLPDB1.DEBEZIUM.DEBEZIUM_SIGNAL")
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A,DEBEZIUM\\.DEBEZIUM_SIGNAL")
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
}
@Override
protected String valueFieldName() {
return "AA";
}
@Override
protected String pkFieldName() {
return "PK";
}
}

View File

@ -43,6 +43,7 @@
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
@ -132,7 +133,7 @@ public void testIsNotEmptyWhenTransactionIsRegistered() {
@Test
public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException {
registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID);
offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (String) null, false, true, new TransactionContext());
offsetContext = createOffsetContext(SCN, SCN);
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
assertThat(transactionalBuffer.isEmpty()).isTrue();
}
@ -167,7 +168,7 @@ public void testNonEmptySecondTransactionIsRolledBack() {
@Test
public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException {
registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID);
offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, null, false, true, new TransactionContext());
offsetContext = createOffsetContext(SCN, SCN);
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
assertThat(streamingMetrics.getOldestScn()).isEqualTo(SCN.toString());
assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
@ -177,7 +178,7 @@ public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedExcep
public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException {
registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID);
registerDmlOperation(OTHER_TRANSACTION_ID, OTHER_SCN, OTHER_ROW_ID);
offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, null, false, true, new TransactionContext());
offsetContext = createOffsetContext(SCN, SCN);
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
// after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet
@ -192,7 +193,7 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Interrupted
public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException {
registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID);
registerDmlOperation(OTHER_TRANSACTION_ID, OTHER_SCN, OTHER_ROW_ID);
offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN, OTHER_SCN, null, false, true, new TransactionContext());
offsetContext = createOffsetContext(OTHER_SCN, OTHER_SCN);
transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
assertThat(streamingMetrics.getOldestScn()).isEqualTo(SCN.toString());
@ -203,7 +204,7 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte
@Test
public void testAbandoningOneTransaction() {
registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID);
offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (String) null, false, true, new TransactionContext());
offsetContext = createOffsetContext(SCN, SCN);
transactionalBuffer.abandonLongTransactions(SCN, offsetContext);
assertThat(transactionalBuffer.isEmpty()).isTrue();
}
@ -228,4 +229,9 @@ public void testTransactionDump() {
private void registerDmlOperation(String txId, Scn scn, String rowId) {
transactionalBuffer.registerDmlOperation(RowMapper.INSERT, txId, scn, TABLE_ID, DML_ENTRY, Instant.now(), rowId, null, 0L);
}
private OracleOffsetContext createOffsetContext(Scn scn, Scn commitScn) {
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, false, true, new TransactionContext(),
new SignalBasedIncrementalSnapshotContext<TableId>());
}
}

View File

@ -89,6 +89,10 @@ public void closeWindow(String id, OffsetContext offsetContext) throws Interrupt
readChunk();
}
protected String getSignalTableName(String dataCollectionId) {
return dataCollectionId;
}
protected void sendWindowEvents(OffsetContext offsetContext) throws InterruptedException {
LOGGER.debug("Sending {} events from window buffer", window.size());
offsetContext.incrementalSnapshotEvents();
@ -200,6 +204,7 @@ protected void readChunk() throws InterruptedException {
try {
// This commit should be unnecessary and might be removed later
jdbcConnection.commit();
preReadChunk(context);
context.startNewChunk();
emitWindowOpen();
while (context.snapshotRunning()) {
@ -252,6 +257,9 @@ protected void readChunk() throws InterruptedException {
catch (SQLException e) {
throw new DebeziumException(String.format("Database error while executing incremental snapshot for table '%s'", context.currentDataCollectionId()), e);
}
finally {
postReadChunk(context);
}
}
@Override
@ -376,4 +384,12 @@ private Object[] keyFromRow(Object[] row) {
protected void setContext(IncrementalSnapshotContext<T> context) {
this.context = context;
}
protected void preReadChunk(IncrementalSnapshotContext<T> context) {
// no-op
}
protected void postReadChunk(IncrementalSnapshotContext<T> context) {
// no-op
}
}

View File

@ -32,7 +32,7 @@ public SignalBasedIncrementalSnapshotChangeEventSource(CommonConnectorConfig con
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener);
signalWindowStatement = "INSERT INTO " + config.getSignalingDataCollectionId()
signalWindowStatement = "INSERT INTO " + getSignalTableName(config.getSignalingDataCollectionId())
+ " VALUES (?, ?, null)";
}

View File

@ -51,6 +51,10 @@ public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector>
protected abstract Configuration.Builder config();
protected String tableDataCollectionId() {
return tableName();
}
protected void populateTable(JdbcConnection connection) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
@ -118,7 +122,7 @@ protected void sendAdHocSnapshotSignal() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
String query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')",
signalTableName(), tableName());
signalTableName(), tableDataCollectionId());
logger.info("Sending signal with query {}", query);
connection.execute(query);
}

View File

@ -58,6 +58,8 @@ If the connector stops again for any reason, upon restart, the connector continu
|===
include::{partialsdir}/modules/all-connectors/ref-connector-incremental-snapshot.adoc[leveloffset=+3]
[[oracle-topic-names]]
=== Topics Names