DBZ-4584 Support case-sensitive Oracle incremental snapshots

This commit is contained in:
Chris Cranford 2022-01-28 10:16:27 -05:00 committed by Jiri Pechanec
parent bfd2ce97e8
commit dcd8607108
3 changed files with 153 additions and 10 deletions

View File

@ -0,0 +1,133 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipTestRule;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Testing;
/**
* Incremental Snapshots tests for the Oracle connector.
*
* @author Chris Cranford
*/
public class IncrementalSnapshotCaseSensitiveIT extends AbstractIncrementalSnapshotTest<OracleConnector> {
private OracleConnection connection;
@Rule
public SkipTestRule skipRule = new SkipTestRule();
@Before
public void before() throws Exception {
connection = TestHelper.testConnection();
TestHelper.dropTable(connection, "a");
connection.execute("CREATE TABLE a (\"Pk\" numeric(9,0) primary key, aa numeric(9,0))");
// todo: creates signal table in the PDB, do we want it to be in the CDB?
TestHelper.dropTable(connection, "debezium_signal");
connection.execute("CREATE TABLE debezium_signal (id varchar2(64), type varchar2(32), data varchar2(2048))");
connection.execute("GRANT INSERT on debezium_signal to " + TestHelper.getConnectorUserName());
TestHelper.streamTable(connection, "debezium_signal");
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws Exception {
stopConnector();
if (connection != null) {
TestHelper.dropTable(connection, "a");
TestHelper.dropTable(connection, "debezium_signal");
connection.close();
}
}
@Override
protected void waitForConnectorToStart() {
super.waitForConnectorToStart();
try {
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
protected void populateTable() throws SQLException {
super.populateTable();
TestHelper.streamTable(connection, "a");
}
@Override
protected Class<OracleConnector> connectorClass() {
return OracleConnector.class;
}
@Override
protected JdbcConnection databaseConnection() {
return connection;
}
@Override
protected String topicName() {
return "server1.DEBEZIUM.A";
}
@Override
protected String tableName() {
return "DEBEZIUM.A";
}
@Override
protected String tableDataCollectionId() {
return "ORCLPDB1.DEBEZIUM.A";
}
@Override
protected String signalTableName() {
return "DEBEZIUM.DEBEZIUM_SIGNAL";
}
@Override
protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, "ORCLPDB1.DEBEZIUM.DEBEZIUM_SIGNAL")
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A,DEBEZIUM\\.DEBEZIUM_SIGNAL")
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
}
@Override
protected String valueFieldName() {
return "AA";
}
@Override
protected String pkFieldName() {
return "Pk";
}
@Override
protected String alterTableAddColumnStatement(String tableName) {
return "ALTER TABLE " + tableName + " ADD col3 INTEGER DEFAULT 0";
}
}

View File

@ -171,7 +171,7 @@ protected String buildChunkQuery(Table table, int limit) {
condition = sql.toString();
}
final String orderBy = getKeyMapper().getKeyKolumns(table).stream()
.map(Column::name)
.map(c -> jdbcConnection.quotedColumnIdString(c.name()))
.collect(Collectors.joining(", "));
return jdbcConnection.buildSelectWithRowLimits(table.id(),
limit,
@ -199,7 +199,7 @@ private void addLowerBound(Table table, StringBuilder sql) {
sql.append('(');
for (int j = 0; j < i + 1; j++) {
final boolean isLastIterationForJ = (i == j);
sql.append(pkColumns.get(j).name());
sql.append(jdbcConnection.quotedColumnIdString(pkColumns.get(j).name()));
sql.append(isLastIterationForJ ? " > ?" : " = ?");
if (!isLastIterationForJ) {
sql.append(" AND ");
@ -217,7 +217,7 @@ private void addLowerBound(Table table, StringBuilder sql) {
protected String buildMaxPrimaryKeyQuery(Table table) {
final String orderBy = getKeyMapper().getKeyKolumns(table).stream()
.map(Column::name)
.map(c -> jdbcConnection.quotedColumnIdString(c.name()))
.collect(Collectors.joining(" DESC, ")) + " DESC";
return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, "*", Optional.empty(), orderBy);
}
@ -402,7 +402,7 @@ public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Off
protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) {
for (Iterator<Column> i = getKeyMapper().getKeyKolumns(table).iterator(); i.hasNext();) {
final Column key = i.next();
sql.append(key.name()).append(predicate);
sql.append(jdbcConnection.quotedColumnIdString(key.name())).append(predicate);
if (i.hasNext()) {
sql.append(" AND ");
}

View File

@ -72,7 +72,8 @@ protected String tableDataCollectionId() {
protected void populateTable(JdbcConnection connection, String tableName) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName, i + 1, i));
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
tableName, connection.quotedColumnIdString(pkFieldName()), i + 1, i));
}
connection.commit();
}
@ -273,8 +274,9 @@ public void inserts() throws Exception {
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)",
connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
tableName(),
connection.quotedColumnIdString(pkFieldName()),
i + ROW_COUNT + 1,
i + ROW_COUNT));
}
@ -302,8 +304,12 @@ public void updates() throws Exception {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(
String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(),
i * batchSize, (i + 1) * batchSize));
String.format("UPDATE %s SET aa = aa + 2000 WHERE %s > %s AND %s <= %s",
tableName(),
connection.quotedColumnIdString(pkFieldName()),
i * batchSize,
connection.quotedColumnIdString(pkFieldName()),
(i + 1) * batchSize));
connection.commit();
}
}
@ -336,8 +342,12 @@ public void updatesWithRestart() throws Exception {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(
String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(),
i * batchSize, (i + 1) * batchSize));
String.format("UPDATE %s SET aa = aa + 2000 WHERE %s > %s AND %s <= %s",
tableName(),
connection.quotedColumnIdString(pkFieldName()),
i * batchSize,
connection.quotedColumnIdString(pkFieldName()),
(i + 1) * batchSize));
connection.commit();
}
}