DBZ-3237 Cleanup test method artifacts

This commit is contained in:
Chris Cranford 2021-03-04 21:35:08 -05:00 committed by Gunnar Morling
parent be80834838
commit 8854a5c1a6
5 changed files with 142 additions and 115 deletions

View File

@ -17,6 +17,7 @@
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.util.Strings;
import oracle.sql.NUMBER;
/**

View File

@ -207,10 +207,17 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
private static OracleConnection connection;
@BeforeClass
public static void dropTables() throws SQLException {
public static void beforeClass() throws SQLException {
connection = TestHelper.testConnection();
for (String table : ALL_TABLES) {
TestHelper.dropTable(connection, table);
dropTables();
}
@AfterClass
public static void dropTables() throws SQLException {
if (connection != null) {
for (String table : ALL_TABLES) {
TestHelper.dropTable(connection, table);
}
}
}

View File

@ -61,6 +61,8 @@ public static void closeConnection() throws SQLException {
adminConnection.close();
}
if (connection != null) {
TestHelper.dropTable(connection, "debezium.table1");
TestHelper.dropTable(connection, "debezium.table2");
connection.close();
}
}

View File

@ -132,6 +132,10 @@ public static void beforeClass() throws SQLException {
@AfterClass
public static void closeConnection() throws SQLException {
if (connection != null) {
TestHelper.dropTable(connection, "customer");
TestHelper.dropTable(connection, "masked_hashed_column_table");
TestHelper.dropTable(connection, "truncated_column_table");
TestHelper.dropTable(connection, "dt_table");
connection.close();
}
}
@ -151,52 +155,56 @@ public void before() throws SQLException {
@FixFor("DBZ-2452")
public void shouldSnapshotAndStreamWithHyphenedTableName() throws Exception {
TestHelper.dropTable(connection, "debezium.\"my-table\"");
try {
String ddl = "create table \"my-table\" (" +
" id numeric(9,0) not null, " +
" c1 int, " +
" c2 varchar(128), " +
" primary key (id))";
String ddl = "create table \"my-table\" (" +
" id numeric(9,0) not null, " +
" c1 int, " +
" c2 varchar(128), " +
" primary key (id))";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.\"my-table\" to " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.\"my-table\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
connection.execute("INSERT INTO debezium.\"my-table\" VALUES (1, 25, 'Test')");
connection.execute("COMMIT");
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.\"my-table\" to " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.\"my-table\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
connection.execute("INSERT INTO debezium.\"my-table\" VALUES (1, 25, 'Test')");
connection.execute("COMMIT");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.MY-TABLE")
.build();
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.MY-TABLE")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
connection.execute("INSERT INTO debezium.\"my-table\" VALUES (2, 50, 'Test2')");
connection.execute("COMMIT");
connection.execute("INSERT INTO debezium.\"my-table\" VALUES (2, 50, 'Test2')");
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> hyphenatedTableRecords = records.recordsForTopic("server1.DEBEZIUM.my-table");
assertThat(hyphenatedTableRecords).hasSize(2);
SourceRecords records = consumeRecordsByTopic(2);
List<SourceRecord> hyphenatedTableRecords = records.recordsForTopic("server1.DEBEZIUM.my-table");
assertThat(hyphenatedTableRecords).hasSize(2);
// read
SourceRecord record1 = hyphenatedTableRecords.get(0);
VerifyRecord.isValidRead(record1, "ID", 1);
Struct after1 = (Struct) ((Struct) record1.value()).get(AFTER);
assertThat(after1.get("ID")).isEqualTo(1);
assertThat(after1.get("C1")).isEqualTo(BigDecimal.valueOf(25L));
assertThat(after1.get("C2")).isEqualTo("Test");
assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
// read
SourceRecord record1 = hyphenatedTableRecords.get(0);
VerifyRecord.isValidRead(record1, "ID", 1);
Struct after1 = (Struct) ((Struct) record1.value()).get(AFTER);
assertThat(after1.get("ID")).isEqualTo(1);
assertThat(after1.get("C1")).isEqualTo(BigDecimal.valueOf(25L));
assertThat(after1.get("C2")).isEqualTo("Test");
assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
// insert
SourceRecord record2 = hyphenatedTableRecords.get(1);
VerifyRecord.isValidInsert(record2, "ID", 2);
Struct after2 = (Struct) ((Struct) record2.value()).get(AFTER);
assertThat(after2.get("ID")).isEqualTo(2);
assertThat(after2.get("C1")).isEqualTo(BigDecimal.valueOf(50L));
assertThat(after2.get("C2")).isEqualTo("Test2");
// insert
SourceRecord record2 = hyphenatedTableRecords.get(1);
VerifyRecord.isValidInsert(record2, "ID", 2);
Struct after2 = (Struct) ((Struct) record2.value()).get(AFTER);
assertThat(after2.get("ID")).isEqualTo(2);
assertThat(after2.get("C1")).isEqualTo(BigDecimal.valueOf(50L));
assertThat(after2.get("C2")).isEqualTo("Test2");
}
finally {
TestHelper.dropTable(connection, "debezium.\"my-table\"");
}
}
@Test
@ -943,9 +951,8 @@ public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() th
final SourceRecords streamingRecords = consumeRecordsByTopic(1);
assertThat(streamingRecords.recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1);
}
catch (Exception e) {
finally {
TestHelper.dropTable(connection, "debezium.complex_ddl");
throw e;
}
}
@ -954,45 +961,49 @@ public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() th
@RequireDatabaseOption("Partitioning")
public void shouldSnapshotAndStreamChangesFromPartitionedTable() throws Exception {
TestHelper.dropTable(connection, "players");
try {
final String ddl = "CREATE TABLE players (" +
"id NUMERIC(6), " +
"name VARCHAR(100), " +
"birth_date DATE," +
"primary key(id)) " +
"PARTITION BY RANGE (birth_date) (" +
"PARTITION p2019 VALUES LESS THAN (TO_DATE('01-JAN-2020', 'dd-MON-yyyy')), " +
"PARTITION p2020 VALUES LESS THAN (TO_DATE('01-JAN-2021', 'dd-MON-yyyy'))" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.players to " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.players ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
final String ddl = "CREATE TABLE players (" +
"id NUMERIC(6), " +
"name VARCHAR(100), " +
"birth_date DATE," +
"primary key(id)) " +
"PARTITION BY RANGE (birth_date) (" +
"PARTITION p2019 VALUES LESS THAN (TO_DATE('01-JAN-2020', 'dd-MON-yyyy')), " +
"PARTITION p2020 VALUES LESS THAN (TO_DATE('01-JAN-2021', 'dd-MON-yyyy'))" +
")";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.players to " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.players ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
// Insert a record to be captured by snapshot
connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (1, 'Roger Rabbit', '01-MAY-2019')");
connection.commit();
// Insert a record to be captured by snapshot
connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (1, 'Roger Rabbit', '01-MAY-2019')");
connection.commit();
// Start connector
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.PLAYERS")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
// Start connector
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.PLAYERS")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
final SourceRecords snapshotRecords = consumeRecordsByTopic(1);
assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(1);
final SourceRecords snapshotRecords = consumeRecordsByTopic(1);
assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(1);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert a record to be captured during streaming
connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (2, 'Bugs Bunny', '26-JUN-2019')");
connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (3, 'Elmer Fud', '01-NOV-2020')");
connection.commit();
// Insert a record to be captured during streaming
connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (2, 'Bugs Bunny', '26-JUN-2019')");
connection.execute("INSERT INTO debezium.players (id, name, birth_date) VALUES (3, 'Elmer Fud', '01-NOV-2020')");
connection.commit();
final SourceRecords streamRecords = consumeRecordsByTopic(2);
assertThat(streamRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(2);
final SourceRecords streamRecords = consumeRecordsByTopic(2);
assertThat(streamRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(2);
}
finally {
TestHelper.dropTable(connection, "players");
}
}
@Test
@ -1056,55 +1067,59 @@ public void shouldAvroSerializeColumnsWithSpecialCharacters() throws Exception {
public void testArchiveLogScnBoundariesAreIncluded() throws Exception {
// Drop table if it exists
TestHelper.dropTable(connection, "alog_test");
try {
final String ddl = "CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.alog_test TO " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.alog_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
connection.commit();
final String ddl = "CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))";
connection.execute(ddl);
connection.execute("GRANT SELECT ON debezium.alog_test TO " + TestHelper.getConnectorUserName());
connection.execute("ALTER TABLE debezium.alog_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
connection.commit();
// Insert a snapshot record
connection.execute("INSERT INTO debezium.alog_test (id, name) VALUES (1, 'Test')");
connection.commit();
// Insert a snapshot record
connection.execute("INSERT INTO debezium.alog_test (id, name) VALUES (1, 'Test')");
connection.commit();
// start connector and take snapshot
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.ALOG_TEST")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
// start connector and take snapshot
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.ALOG_TEST")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Validate snapshot record
final SourceRecords snapshotRecords = consumeRecordsByTopic(1);
assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
SourceRecord record = snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0);
Struct after = (Struct) ((Struct) record.value()).get(AFTER);
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1));
assertThat(after.get("NAME")).isEqualTo("Test");
// Validate snapshot record
final SourceRecords snapshotRecords = consumeRecordsByTopic(1);
assertThat(snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
SourceRecord record = snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0);
Struct after = (Struct) ((Struct) record.value()).get(AFTER);
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1));
assertThat(after.get("NAME")).isEqualTo("Test");
// stop the connector
stopConnector();
// stop the connector
stopConnector();
// Force flush of all redo logs to archive logs
TestHelper.forceFlushOfRedoLogsToArchiveLogs();
// Force flush of all redo logs to archive logs
TestHelper.forceFlushOfRedoLogsToArchiveLogs();
// Start connector and wait for streaming
start(OracleConnector.class, config);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Start connector and wait for streaming
start(OracleConnector.class, config);
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Insert record for streaming
connection.execute("INSERT INTO debezium.alog_test (id, name) values (2, 'Home')");
connection.execute("COMMIT");
// Insert record for streaming
connection.execute("INSERT INTO debezium.alog_test (id, name) values (2, 'Home')");
connection.execute("COMMIT");
// Validate streaming record
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
record = records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0);
after = (Struct) ((Struct) record.value()).get(AFTER);
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2));
assertThat(after.get("NAME")).isEqualTo("Home");
// Validate streaming record
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
record = records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0);
after = (Struct) ((Struct) record.value()).get(AFTER);
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2));
assertThat(after.get("NAME")).isEqualTo("Home");
}
finally {
TestHelper.dropTable(connection, "alog_test");
}
}
@Test

View File

@ -73,6 +73,8 @@ public static void beforeClass() throws SQLException {
@AfterClass
public static void closeConnection() throws SQLException {
if (connection != null) {
TestHelper.dropTable(connection, "debezium.orders");
TestHelper.dropTable(connection, "debezium.customer");
connection.close();
}
}