diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/converters/NumberOneToBooleanConverter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/converters/NumberOneToBooleanConverter.java index 211963e62..8e76114c3 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/converters/NumberOneToBooleanConverter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/converters/NumberOneToBooleanConverter.java @@ -17,6 +17,7 @@ import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import io.debezium.util.Strings; + import oracle.sql.NUMBER; /** diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java index 63a47d9d2..120b77c17 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java @@ -207,10 +207,17 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest private static OracleConnection connection; @BeforeClass - public static void dropTables() throws SQLException { + public static void beforeClass() throws SQLException { connection = TestHelper.testConnection(); - for (String table : ALL_TABLES) { - TestHelper.dropTable(connection, table); + dropTables(); + } + + @AfterClass + public static void dropTables() throws SQLException { + if (connection != null) { + for (String table : ALL_TABLES) { + TestHelper.dropTable(connection, table); + } } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java index ae27eb028..d56f2031d 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java @@ -61,6 +61,8 @@ public static void closeConnection() throws SQLException { adminConnection.close(); } if (connection != null) { + TestHelper.dropTable(connection, "debezium.table1"); + TestHelper.dropTable(connection, "debezium.table2"); connection.close(); } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 315e2b886..4e8c1db65 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -132,6 +132,10 @@ public static void beforeClass() throws SQLException { @AfterClass public static void closeConnection() throws SQLException { if (connection != null) { + TestHelper.dropTable(connection, "customer"); + TestHelper.dropTable(connection, "masked_hashed_column_table"); + TestHelper.dropTable(connection, "truncated_column_table"); + TestHelper.dropTable(connection, "dt_table"); connection.close(); } } @@ -151,52 +155,56 @@ public void before() throws SQLException { @FixFor("DBZ-2452") public void shouldSnapshotAndStreamWithHyphenedTableName() throws Exception { TestHelper.dropTable(connection, "debezium.\"my-table\""); + try { + String ddl = "create table \"my-table\" (" + + " id numeric(9,0) not null, " + + " c1 int, " + + " c2 varchar(128), " + + " primary key (id))"; - String ddl = "create table \"my-table\" (" + - " id numeric(9,0) not null, " + - " c1 int, " + - " c2 varchar(128), " + - " primary key (id))"; + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.\"my-table\" to " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.\"my-table\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + connection.execute("INSERT INTO debezium.\"my-table\" VALUES (1, 25, 'Test')"); + connection.execute("COMMIT"); - connection.execute(ddl); - connection.execute("GRANT SELECT ON debezium.\"my-table\" to " + TestHelper.getConnectorUserName()); - connection.execute("ALTER TABLE debezium.\"my-table\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); - connection.execute("INSERT INTO debezium.\"my-table\" VALUES (1, 25, 'Test')"); - connection.execute("COMMIT"); + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.MY-TABLE") + .build(); - Configuration config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.MY-TABLE") - .build(); + start(OracleConnector.class, config); + assertConnectorIsRunning(); - start(OracleConnector.class, config); - assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + connection.execute("INSERT INTO debezium.\"my-table\" VALUES (2, 50, 'Test2')"); + connection.execute("COMMIT"); - connection.execute("INSERT INTO debezium.\"my-table\" VALUES (2, 50, 'Test2')"); - connection.execute("COMMIT"); + SourceRecords records = consumeRecordsByTopic(2); + List hyphenatedTableRecords = records.recordsForTopic("server1.DEBEZIUM.my-table"); + assertThat(hyphenatedTableRecords).hasSize(2); - SourceRecords records = consumeRecordsByTopic(2); - List hyphenatedTableRecords = records.recordsForTopic("server1.DEBEZIUM.my-table"); - assertThat(hyphenatedTableRecords).hasSize(2); + // read + SourceRecord record1 = hyphenatedTableRecords.get(0); + VerifyRecord.isValidRead(record1, "ID", 1); + Struct after1 = (Struct) ((Struct) record1.value()).get(AFTER); + assertThat(after1.get("ID")).isEqualTo(1); + assertThat(after1.get("C1")).isEqualTo(BigDecimal.valueOf(25L)); + assertThat(after1.get("C2")).isEqualTo("Test"); + assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); + assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true); - // read - SourceRecord record1 = hyphenatedTableRecords.get(0); - VerifyRecord.isValidRead(record1, "ID", 1); - Struct after1 = (Struct) ((Struct) record1.value()).get(AFTER); - assertThat(after1.get("ID")).isEqualTo(1); - assertThat(after1.get("C1")).isEqualTo(BigDecimal.valueOf(25L)); - assertThat(after1.get("C2")).isEqualTo("Test"); - assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); - assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true); - - // insert - SourceRecord record2 = hyphenatedTableRecords.get(1); - VerifyRecord.isValidInsert(record2, "ID", 2); - Struct after2 = (Struct) ((Struct) record2.value()).get(AFTER); - assertThat(after2.get("ID")).isEqualTo(2); - assertThat(after2.get("C1")).isEqualTo(BigDecimal.valueOf(50L)); - assertThat(after2.get("C2")).isEqualTo("Test2"); + // insert + SourceRecord record2 = hyphenatedTableRecords.get(1); + VerifyRecord.isValidInsert(record2, "ID", 2); + Struct after2 = (Struct) ((Struct) record2.value()).get(AFTER); + assertThat(after2.get("ID")).isEqualTo(2); + assertThat(after2.get("C1")).isEqualTo(BigDecimal.valueOf(50L)); + assertThat(after2.get("C2")).isEqualTo("Test2"); + } + finally { + TestHelper.dropTable(connection, "debezium.\"my-table\""); + } } @Test @@ -943,9 +951,8 @@ public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() th final SourceRecords streamingRecords = consumeRecordsByTopic(1); assertThat(streamingRecords.recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1); } - catch (Exception e) { + finally { TestHelper.dropTable(connection, "debezium.complex_ddl"); - throw e; } } @@ -954,45 +961,49 @@ public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() th @RequireDatabaseOption("Partitioning") public void shouldSnapshotAndStreamChangesFromPartitionedTable() throws Exception { TestHelper.dropTable(connection, "players"); + try { + final String ddl = "CREATE TABLE players (" + + "id NUMERIC(6), " + + "name VARCHAR(100), " + + "birth_date DATE," + + "primary key(id)) " + + "PARTITION BY RANGE (birth_date) (" + + "PARTITION p2019 VALUES LESS THAN (TO_DATE('01-JAN-2020', 'dd-MON-yyyy')), " + + "PARTITION p2020 VALUES LESS THAN (TO_DATE('01-JAN-2021', 'dd-MON-yyyy'))" + + ")"; + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.players to " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.players ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); - final String ddl = "CREATE TABLE players (" + - "id NUMERIC(6), " + - "name VARCHAR(100), " + - "birth_date DATE," + - "primary key(id)) " + - "PARTITION BY RANGE (birth_date) (" + - "PARTITION p2019 VALUES LESS THAN (TO_DATE('01-JAN-2020', 'dd-MON-yyyy')), " + - "PARTITION p2020 VALUES LESS THAN (TO_DATE('01-JAN-2021', 'dd-MON-yyyy'))" + - ")"; - connection.execute(ddl); - connection.execute("GRANT SELECT ON debezium.players to " + TestHelper.getConnectorUserName()); - connection.execute("ALTER TABLE debezium.players ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + // Insert a record to be captured by snapshot + connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (1, 'Roger Rabbit', '01-MAY-2019')"); + connection.commit(); - // Insert a record to be captured by snapshot - connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (1, 'Roger Rabbit', '01-MAY-2019')"); - connection.commit(); + // Start connector + final Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.PLAYERS") + .build(); + start(OracleConnector.class, config); + assertConnectorIsRunning(); - // Start connector - final Configuration config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.PLAYERS") - .build(); - start(OracleConnector.class, config); - assertConnectorIsRunning(); + waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + final SourceRecords snapshotRecords = consumeRecordsByTopic(1); + assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(1); - final SourceRecords snapshotRecords = consumeRecordsByTopic(1); - assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(1); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + // Insert a record to be captured during streaming + connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (2, 'Bugs Bunny', '26-JUN-2019')"); + connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (3, 'Elmer Fud', '01-NOV-2020')"); + connection.commit(); - // Insert a record to be captured during streaming - connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (2, 'Bugs Bunny', '26-JUN-2019')"); - connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (3, 'Elmer Fud', '01-NOV-2020')"); - connection.commit(); - - final SourceRecords streamRecords = consumeRecordsByTopic(2); - assertThat(streamRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(2); + final SourceRecords streamRecords = consumeRecordsByTopic(2); + assertThat(streamRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(2); + } + finally { + TestHelper.dropTable(connection, "players"); + } } @Test @@ -1056,55 +1067,59 @@ public void shouldAvroSerializeColumnsWithSpecialCharacters() throws Exception { public void testArchiveLogScnBoundariesAreIncluded() throws Exception { // Drop table if it exists TestHelper.dropTable(connection, "alog_test"); + try { + final String ddl = "CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))"; + connection.execute(ddl); + connection.execute("GRANT SELECT ON debezium.alog_test TO " + TestHelper.getConnectorUserName()); + connection.execute("ALTER TABLE debezium.alog_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + connection.commit(); - final String ddl = "CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))"; - connection.execute(ddl); - connection.execute("GRANT SELECT ON debezium.alog_test TO " + TestHelper.getConnectorUserName()); - connection.execute("ALTER TABLE debezium.alog_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); - connection.commit(); + // Insert a snapshot record + connection.execute("INSERT INTO debezium.alog_test (id, name) VALUES (1, 'Test')"); + connection.commit(); - // Insert a snapshot record - connection.execute("INSERT INTO debezium.alog_test (id, name) VALUES (1, 'Test')"); - connection.commit(); + // start connector and take snapshot + final Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.ALOG_TEST") + .build(); + start(OracleConnector.class, config); + assertConnectorIsRunning(); - // start connector and take snapshot - final Configuration config = TestHelper.defaultConfig() - .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.ALOG_TEST") - .build(); - start(OracleConnector.class, config); - assertConnectorIsRunning(); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + // Validate snapshot record + final SourceRecords snapshotRecords = consumeRecordsByTopic(1); + assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1); + SourceRecord record = snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0); + Struct after = (Struct) ((Struct) record.value()).get(AFTER); + assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1)); + assertThat(after.get("NAME")).isEqualTo("Test"); - // Validate snapshot record - final SourceRecords snapshotRecords = consumeRecordsByTopic(1); - assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1); - SourceRecord record = snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0); - Struct after = (Struct) ((Struct) record.value()).get(AFTER); - assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1)); - assertThat(after.get("NAME")).isEqualTo("Test"); + // stop the connector + stopConnector(); - // stop the connector - stopConnector(); + // Force flush of all redo logs to archive logs + TestHelper.forceFlushOfRedoLogsToArchiveLogs(); - // Force flush of all redo logs to archive logs - TestHelper.forceFlushOfRedoLogsToArchiveLogs(); + // Start connector and wait for streaming + start(OracleConnector.class, config); + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); - // Start connector and wait for streaming - start(OracleConnector.class, config); - waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + // Insert record for streaming + connection.execute("INSERT INTO debezium.alog_test (id, name) values (2, 'Home')"); + connection.execute("COMMIT"); - // Insert record for streaming - connection.execute("INSERT INTO debezium.alog_test (id, name) values (2, 'Home')"); - connection.execute("COMMIT"); - - // Validate streaming record - final SourceRecords records = consumeRecordsByTopic(1); - assertThat(records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1); - record = records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0); - after = (Struct) ((Struct) record.value()).get(AFTER); - assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2)); - assertThat(after.get("NAME")).isEqualTo("Home"); + // Validate streaming record + final SourceRecords records = consumeRecordsByTopic(1); + assertThat(records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1); + record = records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0); + after = (Struct) ((Struct) record.value()).get(AFTER); + assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2)); + assertThat(after.get("NAME")).isEqualTo("Home"); + } + finally { + TestHelper.dropTable(connection, "alog_test"); + } } @Test diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java index 223afe9d8..c8aad77fd 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/TransactionMetadataIT.java @@ -73,6 +73,8 @@ public static void beforeClass() throws SQLException { @AfterClass public static void closeConnection() throws SQLException { if (connection != null) { + TestHelper.dropTable(connection, "debezium.orders"); + TestHelper.dropTable(connection, "debezium.customer"); connection.close(); } }