DBZ-6566 Support Blocking snapshot for Oracle

This commit is contained in:
mfvitale 2023-07-26 17:02:08 +02:00 committed by Jiri Pechanec
parent 3718d9818e
commit 85d725fde8
8 changed files with 201 additions and 7 deletions

View File

@ -125,6 +125,12 @@
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-converter</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-jboss-logmanager</artifactId>
<groupId>org.jboss.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- Used for unit testing with Kafka -->

View File

@ -69,7 +69,7 @@ protected SnapshottingTask getSnapshottingTask(OraclePartition partition, Oracle
snapshotData = connectorConfig.getSnapshotMode().includeData();
}
// found a previous offset and the earlier snapshot has completed
else if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
else if (previousOffset != null && !previousOffset.isSnapshotRunning() && false /* TODO check if streaming is pause */) {
LOGGER.info("The previous offset has been found.");
snapshotSchema = databaseSchema.isStorageInitializationExecuted();
snapshotData = false;

View File

@ -244,6 +244,14 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
}
pauseBetweenMiningSessions();
}
if (context.isPaused()) {
LOGGER.info("Streaming will now pause");
context.streamingPaused();
context.waitSnapshotCompletion();
LOGGER.info("Streaming resumed");
}
}
}
}

View File

@ -124,6 +124,13 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
LOGGER.trace("Receiving LCR");
xsOut.receiveLCRCallback(eventHandler, XStreamOut.DEFAULT_MODE);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
if (context.isPaused()) {
LOGGER.info("Streaming will now pause");
context.streamingPaused();
context.waitSnapshotCompletion();
LOGGER.info("Streaming resumed");
}
}
}
finally {

View File

@ -0,0 +1,155 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipTestRule;
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
private OracleConnection connection;
@Rule
public SkipTestRule skipRule = new SkipTestRule();
@Before
public void before() throws Exception {
connection = TestHelper.testConnection();
TestHelper.dropTable(connection, "a");
connection.execute("CREATE TABLE a (pk numeric(9,0) primary key, aa numeric(9,0))");
connection.execute("GRANT INSERT on a to " + TestHelper.getConnectorUserName());
TestHelper.streamTable(connection, "a");
TestHelper.dropTable(connection, "debezium_signal");
connection.execute("CREATE TABLE debezium_signal (id varchar2(64), type varchar2(32), data varchar2(2048))");
connection.execute("GRANT INSERT on debezium_signal to " + TestHelper.getConnectorUserName());
TestHelper.streamTable(connection, "debezium_signal");
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
}
@After
public void after() throws Exception {
stopConnector();
if (connection != null) {
TestHelper.dropTable(connection, "a");
TestHelper.dropTable(connection, "debezium_signal");
connection.close();
}
}
@Override
protected void waitForConnectorToStart() {
super.waitForConnectorToStart();
try {
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
protected Class<OracleConnector> connectorClass() {
return OracleConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected String topicName() {
return "server1.DEBEZIUM.A";
}
@Override
protected List<String> topicNames() {
return List.of("server1.DEBEZIUM.A");
}
@Override
protected String tableName() {
return "DEBEZIUM.A";
}
@Override
protected List<String> tableNames() {
return List.of("DEBEZIUM.A");
}
@Override
protected String tableDataCollectionId() {
return TestHelper.getDatabaseName() + ".DEBEZIUM.A";
}
@Override
protected List<String> tableDataCollectionIds() {
return List.of(TestHelper.getDatabaseName() + ".DEBEZIUM.A");
}
@Override
protected String signalTableName() {
return "DEBEZIUM.DEBEZIUM_SIGNAL";
}
@Override
protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL)
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL")
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A")
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
}
@Override
protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
return config();
}
@Override
protected String valueFieldName() {
return "AA";
}
@Override
protected String pkFieldName() {
return "PK";
}
@Override
protected String alterTableAddColumnStatement(String tableName) {
return "ALTER TABLE " + tableName + " ADD col3 INTEGER DEFAULT 0";
}
@Override
protected String connector() {
return "oracle";
}
@Override
protected String server() {
return TestHelper.SERVER_NAME;
}
}

View File

@ -72,4 +72,17 @@
level="warn" additivity="false">
<appender-ref ref="CONSOLE" />
</logger>
<!-- For debug purpose -->
<logger
name="io.debezium.pipeline.ChangeEventSourceCoordinator"
level="trace" additivity="false">
<appender-ref ref="CONSOLE" />
</logger>
<logger
name="io.debezium.pipeline.EventDispatcher"
level="off" additivity="false">
<appender-ref ref="CONSOLE" />
</logger>
</configuration>

View File

@ -77,6 +77,12 @@
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-converter</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-jboss-logmanager</artifactId>
<groupId>org.jboss.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>

View File

@ -79,12 +79,11 @@ public void executeBlockingSnapshot() throws Exception {
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2);
signalingRecords = 1;
assertRecordsFromSnapshotAndStreamingArePresent((ROW_COUNT * 2) + signalingRecords);
insertRecords(ROW_COUNT, (ROW_COUNT * 2));
signalingRecords = 1;
assertStreamingRecordsArePresent(ROW_COUNT + signalingRecords);
}
@ -109,7 +108,7 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
Long totalSnapshotRecords = getTotalSnapshotRecords(tableName(), connector(), server(), task(), database());
Long totalSnapshotRecords = getTotalSnapshotRecords(tableDataCollectionId(), connector(), server(), task(), database());
batchInserts.get(120, TimeUnit.SECONDS);
@ -119,7 +118,7 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
1; // from snapshot
assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords),
AbstractBlockingSnapshotTest.getExpectedValues(totalSnapshotRecords));
getExpectedValues(totalSnapshotRecords));
}
protected int insertMaxSleep() {
@ -194,7 +193,7 @@ private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> e
SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(expectedRecords, 10);
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords);
List<Integer> actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream()
.map(s -> ((Struct) s.value()).getStruct("after").getInt32("aa"))
.map(s -> ((Struct) s.value()).getStruct("after").getInt32(valueFieldName()))
.collect(Collectors.toList());
assertThat(actual).containsAll(expectedValues);
}