From 393a44a55fe2c80971fea6c04c4cb0c00c49e1ef Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Thu, 22 Jul 2021 17:58:09 -0400 Subject: [PATCH] DBZ-3692 Oracle incremental snapshot support --- .../OracleChangeEventSourceFactory.java | 27 ++++ .../connector/oracle/OracleConnection.java | 27 ++++ .../connector/oracle/OracleOffsetContext.java | 31 ++++- ...dIncrementalSnapshotChangeEventSource.java | 58 ++++++++ .../OracleSnapshotChangeEventSource.java | 2 + .../LogMinerOracleOffsetContextLoader.java | 4 +- .../XStreamOracleOffsetContextLoader.java | 4 +- .../oracle/IncrementalSnapshotIT.java | 128 ++++++++++++++++++ .../logminer/TransactionalBufferTest.java | 16 ++- ...tIncrementalSnapshotChangeEventSource.java | 16 +++ ...dIncrementalSnapshotChangeEventSource.java | 2 +- .../AbstractIncrementalSnapshotTest.java | 6 +- .../modules/ROOT/pages/connectors/oracle.adoc | 2 + 13 files changed, 309 insertions(+), 14 deletions(-) create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSignalBasedIncrementalSnapshotChangeEventSource.java create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java index f1fa20e46..16901c512 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java @@ -5,14 +5,19 @@ */ package io.debezium.connector.oracle; +import java.util.Optional; + import io.debezium.config.Configuration; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; +import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.schema.DataCollectionId; import io.debezium.util.Clock; public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory { @@ -60,4 +65,26 @@ public StreamingChangeEventSource getStrea jdbcConfig, streamingMetrics); } + + @Override + public Optional> getIncrementalSnapshotChangeEventSource( + OracleOffsetContext offsetContext, + SnapshotProgressListener snapshotProgressListener, + DataChangeEventListener dataChangeEventListener) { + // Incremental snapshots requires a secondary database connection + // This is because Xstream does not allow any work on the connection while the LCR handler may be invoked + // and LogMiner streams results from the CDB$ROOT container but we will need to stream changes from the + // PDB when reading snapshot records. + // + // todo: consider adding a hook so that the connection can be lazily opened & closed when we're done with + // performing any and all incremental snapshot operations. + return Optional.of(new OracleSignalBasedIncrementalSnapshotChangeEventSource( + configuration, + new OracleConnection(jdbcConnection.config(), () -> getClass().getClassLoader()), + dispatcher, + schema, + clock, + snapshotProgressListener, + dataChangeEventListener)); + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index 6d820650b..9cda4da95 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -18,6 +18,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -406,6 +407,32 @@ public T singleOptionalValue(String query, ResultSetExtractor extractor) return queryAndMap(query, rs -> rs.next() ? extractor.apply(rs) : null); } + @Override + public String buildSelectWithRowLimits(TableId tableId, + int limit, + String projection, + Optional condition, + String orderBy) { + final TableId table = new TableId(null, tableId.schema(), tableId.table()); + final StringBuilder sql = new StringBuilder("SELECT "); + sql + .append(projection) + .append(" FROM "); + sql.append(quotedTableIdString(table)); + if (condition.isPresent()) { + sql + .append(" WHERE ") + .append(condition.get()); + } + sql + .append(" ORDER BY ") + .append(orderBy) + .append(" FETCH NEXT ") + .append(limit) + .append(" ROWS ONLY"); + return sql.toString(); + } + public static String connectionString(Configuration config) { return config.getString(URL) != null ? config.getString(URL) : ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 6d2319a81..c84ad313b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -15,6 +15,7 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.relational.TableId; @@ -30,6 +31,7 @@ public class OracleOffsetContext implements OffsetContext { private final SourceInfo sourceInfo; private final TransactionContext transactionContext; + private final IncrementalSnapshotContext incrementalSnapshotContext; /** * Whether a snapshot has been completed or not. @@ -37,13 +39,15 @@ public class OracleOffsetContext implements OffsetContext { private boolean snapshotCompleted; public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Scn commitScn, String lcrPosition, - boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) { - this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext); + boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext, + IncrementalSnapshotContext incrementalSnapshotContext) { + this(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext, incrementalSnapshotContext); sourceInfo.setCommitScn(commitScn); } public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, String lcrPosition, - boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext) { + boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext, + IncrementalSnapshotContext incrementalSnapshotContext) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); sourceInfo = new SourceInfo(connectorConfig); @@ -52,6 +56,7 @@ public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Strin sourceInfoSchema = sourceInfo.schema(); this.transactionContext = transactionContext; + this.incrementalSnapshotContext = incrementalSnapshotContext; this.snapshotCompleted = snapshotCompleted; if (this.snapshotCompleted) { @@ -70,6 +75,7 @@ public static class Builder { private boolean snapshot; private boolean snapshotCompleted; private TransactionContext transactionContext; + private IncrementalSnapshotContext incrementalSnapshotContext; public Builder logicalName(OracleConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; @@ -101,8 +107,13 @@ public Builder transactionContext(TransactionContext transactionContext) { return this; } + public Builder incrementalSnapshotContext(IncrementalSnapshotContext incrementalSnapshotContext) { + this.incrementalSnapshotContext = incrementalSnapshotContext; + return this; + } + OracleOffsetContext build() { - return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext); + return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, transactionContext, incrementalSnapshotContext); } } @@ -138,7 +149,7 @@ public static Builder create() { offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : null); offset.put(SourceInfo.COMMIT_SCN_KEY, commitScn != null ? commitScn.toString() : null); } - return transactionContext.store(offset); + return incrementalSnapshotContext.store(transactionContext.store(offset)); } } @@ -249,6 +260,16 @@ public TransactionContext getTransactionContext() { return transactionContext; } + @Override + public void incrementalSnapshotEvents() { + sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); + } + + @Override + public IncrementalSnapshotContext getIncrementalSnapshotContext() { + return incrementalSnapshotContext; + } + /** * Helper method to resolve a {@link Scn} by key from the offset map. * diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSignalBasedIncrementalSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSignalBasedIncrementalSnapshotChangeEventSource.java new file mode 100644 index 000000000..b2e117a32 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSignalBasedIncrementalSnapshotChangeEventSource.java @@ -0,0 +1,58 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource; +import io.debezium.pipeline.source.spi.DataChangeEventListener; +import io.debezium.pipeline.source.spi.SnapshotProgressListener; +import io.debezium.relational.TableId; +import io.debezium.schema.DatabaseSchema; +import io.debezium.util.Clock; + +/** + * @author Chris Cranford + */ +public class OracleSignalBasedIncrementalSnapshotChangeEventSource extends SignalBasedIncrementalSnapshotChangeEventSource { + + private final String pdbName; + private final JdbcConnection connection; + + public OracleSignalBasedIncrementalSnapshotChangeEventSource(CommonConnectorConfig config, + JdbcConnection jdbcConnection, + EventDispatcher dispatcher, + DatabaseSchema databaseSchema, + Clock clock, + SnapshotProgressListener progressListener, + DataChangeEventListener dataChangeEventListener) { + super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener); + this.pdbName = ((OracleConnectorConfig) config).getPdbName(); + this.connection = jdbcConnection; + } + + @Override + protected String getSignalTableName(String dataCollectionId) { + final TableId tableId = TableId.parse(dataCollectionId); + return tableId.schema() + "." + tableId.table(); + } + + @Override + protected void preReadChunk(IncrementalSnapshotContext context) { + if (pdbName != null) { + ((OracleConnection) connection).setSessionToPdb(pdbName); + } + } + + @Override + protected void postReadChunk(IncrementalSnapshotContext context) { + if (pdbName != null) { + ((OracleConnection) connection).resetSessionToCdb(); + } + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index 5610ead6f..8edece7b0 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -126,6 +127,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext()) .build(); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java index 88c94f75d..c8a4ec880 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java @@ -11,6 +11,7 @@ import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.SourceInfo; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -32,7 +33,8 @@ public OracleOffsetContext load(Map offset) { Scn scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY); Scn commitScn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.COMMIT_SCN_KEY); - return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset)); + return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshot, snapshotCompleted, TransactionContext.load(offset), + SignalBasedIncrementalSnapshotContext.load(offset)); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java index 78e012280..3d7ffb7e6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java @@ -11,6 +11,7 @@ import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.SourceInfo; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -42,6 +43,7 @@ public OracleOffsetContext load(Map offset) { scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY); } - return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset)); + return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted, TransactionContext.load(offset), + SignalBasedIncrementalSnapshotContext.load(offset)); } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java new file mode 100644 index 000000000..359c758ed --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java @@ -0,0 +1,128 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.junit.SkipTestRule; +import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.util.Testing; + +/** + * Incremental Snapshots tests for the Oracle connector. + * + * @author Chris Cranford + */ +public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest { + + private OracleConnection connection; + + @Rule + public SkipTestRule skipRule = new SkipTestRule(); + + @Before + public void before() throws Exception { + connection = TestHelper.testConnection(); + + TestHelper.dropTable(connection, "a"); + connection.execute("CREATE TABLE a (pk numeric(9,0) primary key, aa numeric(9,0))"); + + // todo: creates signal table in the PDB, do we want it to be in the CDB? + TestHelper.dropTable(connection, "debezium_signal"); + connection.execute("CREATE TABLE debezium_signal (id varchar2(64), type varchar2(32), data varchar2(2048))"); + connection.execute("GRANT INSERT on debezium_signal to " + TestHelper.getConnectorUserName()); + TestHelper.streamTable(connection, "debezium_signal"); + + setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + } + + @After + public void after() throws Exception { + stopConnector(); + if (connection != null) { + TestHelper.dropTable(connection, "a"); + TestHelper.dropTable(connection, "debezium_signal"); + connection.close(); + } + } + + @Override + protected void waitForConnectorToStart() { + super.waitForConnectorToStart(); + try { + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void populateTable() throws SQLException { + super.populateTable(); + TestHelper.streamTable(connection, "a"); + } + + @Override + protected Class connectorClass() { + return OracleConnector.class; + } + + @Override + protected JdbcConnection databaseConnection() { + return connection; + } + + @Override + protected String topicName() { + return "server1.DEBEZIUM.A"; + } + + @Override + protected String tableName() { + return "DEBEZIUM.A"; + } + + @Override + protected String tableDataCollectionId() { + return "ORCLPDB1.DEBEZIUM.A"; + } + + @Override + protected String signalTableName() { + return "DEBEZIUM.DEBEZIUM_SIGNAL"; + } + + @Override + protected Configuration.Builder config() { + return TestHelper.defaultConfig() + .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, "ORCLPDB1.DEBEZIUM.DEBEZIUM_SIGNAL") + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A,DEBEZIUM\\.DEBEZIUM_SIGNAL") + .with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true); + } + + @Override + protected String valueFieldName() { + return "AA"; + } + + @Override + protected String pkFieldName() { + return "PK"; + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java index 9224c248e..4c58bce89 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/TransactionalBufferTest.java @@ -43,6 +43,7 @@ import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.relational.TableId; import io.debezium.util.Clock; @@ -132,7 +133,7 @@ public void testIsNotEmptyWhenTransactionIsRegistered() { @Test public void testIsEmptyWhenTransactionIsCommitted() throws InterruptedException { registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID); - offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (String) null, false, true, new TransactionContext()); + offsetContext = createOffsetContext(SCN, SCN); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); assertThat(transactionalBuffer.isEmpty()).isTrue(); } @@ -167,7 +168,7 @@ public void testNonEmptySecondTransactionIsRolledBack() { @Test public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedException { registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID); - offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, null, false, true, new TransactionContext()); + offsetContext = createOffsetContext(SCN, SCN); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); assertThat(streamingMetrics.getOldestScn()).isEqualTo(SCN.toString()); assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue(); @@ -177,7 +178,7 @@ public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedExcep public void testCalculateScnWhenFirstTransactionIsCommitted() throws InterruptedException { registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID); registerDmlOperation(OTHER_TRANSACTION_ID, OTHER_SCN, OTHER_ROW_ID); - offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, null, false, true, new TransactionContext()); + offsetContext = createOffsetContext(SCN, SCN); transactionalBuffer.commit(TRANSACTION_ID, SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); // after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet @@ -192,7 +193,7 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Interrupted public void testCalculateScnWhenSecondTransactionIsCommitted() throws InterruptedException { registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID); registerDmlOperation(OTHER_TRANSACTION_ID, OTHER_SCN, OTHER_ROW_ID); - offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN, OTHER_SCN, null, false, true, new TransactionContext()); + offsetContext = createOffsetContext(OTHER_SCN, OTHER_SCN); transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(SCN_ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher); assertThat(streamingMetrics.getOldestScn()).isEqualTo(SCN.toString()); @@ -203,7 +204,7 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte @Test public void testAbandoningOneTransaction() { registerDmlOperation(TRANSACTION_ID, SCN, ROW_ID); - offsetContext = new OracleOffsetContext(connectorConfig, SCN, SCN, (String) null, false, true, new TransactionContext()); + offsetContext = createOffsetContext(SCN, SCN); transactionalBuffer.abandonLongTransactions(SCN, offsetContext); assertThat(transactionalBuffer.isEmpty()).isTrue(); } @@ -228,4 +229,9 @@ public void testTransactionDump() { private void registerDmlOperation(String txId, Scn scn, String rowId) { transactionalBuffer.registerDmlOperation(RowMapper.INSERT, txId, scn, TABLE_ID, DML_ENTRY, Instant.now(), rowId, null, 0L); } + + private OracleOffsetContext createOffsetContext(Scn scn, Scn commitScn) { + return new OracleOffsetContext(connectorConfig, scn, commitScn, null, false, true, new TransactionContext(), + new SignalBasedIncrementalSnapshotContext()); + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java index 35da95c6d..564d08036 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java @@ -89,6 +89,10 @@ public void closeWindow(String id, OffsetContext offsetContext) throws Interrupt readChunk(); } + protected String getSignalTableName(String dataCollectionId) { + return dataCollectionId; + } + protected void sendWindowEvents(OffsetContext offsetContext) throws InterruptedException { LOGGER.debug("Sending {} events from window buffer", window.size()); offsetContext.incrementalSnapshotEvents(); @@ -200,6 +204,7 @@ protected void readChunk() throws InterruptedException { try { // This commit should be unnecessary and might be removed later jdbcConnection.commit(); + preReadChunk(context); context.startNewChunk(); emitWindowOpen(); while (context.snapshotRunning()) { @@ -252,6 +257,9 @@ protected void readChunk() throws InterruptedException { catch (SQLException e) { throw new DebeziumException(String.format("Database error while executing incremental snapshot for table '%s'", context.currentDataCollectionId()), e); } + finally { + postReadChunk(context); + } } @Override @@ -376,4 +384,12 @@ private Object[] keyFromRow(Object[] row) { protected void setContext(IncrementalSnapshotContext context) { this.context = context; } + + protected void preReadChunk(IncrementalSnapshotContext context) { + // no-op + } + + protected void postReadChunk(IncrementalSnapshotContext context) { + // no-op + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java index 23525787a..b73f9cbf8 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java @@ -32,7 +32,7 @@ public SignalBasedIncrementalSnapshotChangeEventSource(CommonConnectorConfig con SnapshotProgressListener progressListener, DataChangeEventListener dataChangeEventListener) { super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener); - signalWindowStatement = "INSERT INTO " + config.getSignalingDataCollectionId() + signalWindowStatement = "INSERT INTO " + getSignalTableName(config.getSignalingDataCollectionId()) + " VALUES (?, ?, null)"; } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index 0b9d4615c..f15484e43 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -51,6 +51,10 @@ public abstract class AbstractIncrementalSnapshotTest protected abstract Configuration.Builder config(); + protected String tableDataCollectionId() { + return tableName(); + } + protected void populateTable(JdbcConnection connection) throws SQLException { connection.setAutoCommit(false); for (int i = 0; i < ROW_COUNT; i++) { @@ -118,7 +122,7 @@ protected void sendAdHocSnapshotSignal() throws SQLException { try (final JdbcConnection connection = databaseConnection()) { String query = String.format( "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')", - signalTableName(), tableName()); + signalTableName(), tableDataCollectionId()); logger.info("Sending signal with query {}", query); connection.execute(query); } diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index f22993990..72cb10944 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -58,6 +58,8 @@ If the connector stops again for any reason, upon restart, the connector continu |=== +include::{partialsdir}/modules/all-connectors/ref-connector-incremental-snapshot.adoc[leveloffset=+3] + [[oracle-topic-names]] === Topics Names