DBZ-720 Reworking SnapshotDatatypesIT to test types captured during initial snapshotting

This commit is contained in:
Gunnar Morling 2018-07-17 10:36:06 +02:00 committed by Jiri Pechanec
parent 8ad452d60f
commit 55bfcdda27
3 changed files with 112 additions and 33 deletions

View File

@ -151,16 +151,17 @@ protected static void createTables() throws SQLException {
} }
} }
List<String> getAllTables() { protected List<String> getAllTables() {
return Arrays.asList(ALL_TABLES); return Arrays.asList(ALL_TABLES);
} }
protected abstract boolean insertRecordsDuringTest();
private static void streamTable(String table) throws SQLException { private static void streamTable(String table) throws SQLException {
connection.execute("GRANT SELECT ON " + table + " to " + TestHelper.CONNECTOR_USER); connection.execute("GRANT SELECT ON " + table + " to " + TestHelper.CONNECTOR_USER);
connection.execute("ALTER TABLE " + table + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); connection.execute("ALTER TABLE " + table + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
} }
@AfterClass @AfterClass
public static void closeConnection() throws SQLException { public static void closeConnection() throws SQLException {
if (connection != null) { if (connection != null) {
@ -171,8 +172,10 @@ public static void closeConnection() throws SQLException {
@Test @Test
public void stringTypes() throws Exception { public void stringTypes() throws Exception {
int expectedRecordCount = 0; int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d')");
connection.execute("COMMIT"); if (insertRecordsDuringTest()) {
insertStringTypes();
}
Testing.debug("Inserted"); Testing.debug("Inserted");
expectedRecordCount++; expectedRecordCount++;
@ -183,7 +186,13 @@ public void stringTypes() throws Exception {
assertThat(testTableRecords).hasSize(expectedRecordCount); assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert // insert
if (insertRecordsDuringTest()) {
VerifyRecord.isValidInsert(testTableRecords.get(0)); VerifyRecord.isValidInsert(testTableRecords.get(0));
}
else {
VerifyRecord.isValidRead(testTableRecords.get(0));
}
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_STRING); assertRecord(after, EXPECTED_STRING);
} }
@ -191,8 +200,10 @@ public void stringTypes() throws Exception {
@Test @Test
public void fpTypes() throws Exception { public void fpTypes() throws Exception {
int expectedRecordCount = 0; int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_fp VALUES (1, 1.1, 2.22, 3.33, 4.4444, 5.555, 6.66, 77.323)");
connection.execute("COMMIT"); if (insertRecordsDuringTest()) {
insertFpTypes();
}
Testing.debug("Inserted"); Testing.debug("Inserted");
expectedRecordCount++; expectedRecordCount++;
@ -203,7 +214,13 @@ public void fpTypes() throws Exception {
assertThat(testTableRecords).hasSize(expectedRecordCount); assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert // insert
if (insertRecordsDuringTest()) {
VerifyRecord.isValidInsert(testTableRecords.get(0)); VerifyRecord.isValidInsert(testTableRecords.get(0));
}
else {
VerifyRecord.isValidRead(testTableRecords.get(0));
}
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_FP); assertRecord(after, EXPECTED_FP);
} }
@ -211,8 +228,10 @@ public void fpTypes() throws Exception {
@Test @Test
public void intTypes() throws Exception { public void intTypes() throws Exception {
int expectedRecordCount = 0; int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_int VALUES (1, 1, 22, 333)");
connection.execute("COMMIT"); if (insertRecordsDuringTest()) {
insertIntTypes();
}
Testing.debug("Inserted"); Testing.debug("Inserted");
expectedRecordCount++; expectedRecordCount++;
@ -223,7 +242,13 @@ public void intTypes() throws Exception {
assertThat(testTableRecords).hasSize(expectedRecordCount); assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert // insert
if (insertRecordsDuringTest()) {
VerifyRecord.isValidInsert(testTableRecords.get(0)); VerifyRecord.isValidInsert(testTableRecords.get(0));
}
else {
VerifyRecord.isValidRead(testTableRecords.get(0));
}
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_INT); assertRecord(after, EXPECTED_INT);
} }
@ -231,16 +256,10 @@ public void intTypes() throws Exception {
@Test @Test
public void timeTypes() throws Exception { public void timeTypes() throws Exception {
int expectedRecordCount = 0; int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_time VALUES ("
+ "1" if (insertRecordsDuringTest()) {
+ ", TO_DATE('27-MAR-2018', 'dd-MON-yyyy')" insertTimeTypes();
+ ", TO_TIMESTAMP('27-MAR-2018 12:34:56.00789', 'dd-MON-yyyy HH24:MI:SS.FF5')" }
+ ", TO_TIMESTAMP_TZ('27-MAR-2018 01:34:56.00789 -11:00', 'dd-MON-yyyy HH24:MI:SS.FF5 TZH:TZM')"
+ ", TO_TIMESTAMP_TZ('27-MAR-2018 01:34:56.00789', 'dd-MON-yyyy HH24:MI:SS.FF5')"
+ ", INTERVAL '-3-6' YEAR TO MONTH"
+ ", INTERVAL '-1 2:3:4.56' DAY TO SECOND"
+ ")");
connection.execute("COMMIT");
Testing.debug("Inserted"); Testing.debug("Inserted");
expectedRecordCount++; expectedRecordCount++;
@ -251,11 +270,45 @@ public void timeTypes() throws Exception {
assertThat(testTableRecords).hasSize(expectedRecordCount); assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert // insert
if (insertRecordsDuringTest()) {
VerifyRecord.isValidInsert(testTableRecords.get(0)); VerifyRecord.isValidInsert(testTableRecords.get(0));
}
else {
VerifyRecord.isValidRead(testTableRecords.get(0));
}
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_TIME); assertRecord(after, EXPECTED_TIME);
} }
protected static void insertStringTypes() throws SQLException {
connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d')");
connection.execute("COMMIT");
}
protected static void insertFpTypes() throws SQLException {
connection.execute("INSERT INTO debezium.type_fp VALUES (1, 1.1, 2.22, 3.33, 4.4444, 5.555, 6.66, 77.323)");
connection.execute("COMMIT");
}
protected static void insertIntTypes() throws SQLException {
connection.execute("INSERT INTO debezium.type_int VALUES (1, 1, 22, 333)");
connection.execute("COMMIT");
}
protected static void insertTimeTypes() throws SQLException {
connection.execute("INSERT INTO debezium.type_time VALUES ("
+ "1"
+ ", TO_DATE('27-MAR-2018', 'dd-MON-yyyy')"
+ ", TO_TIMESTAMP('27-MAR-2018 12:34:56.00789', 'dd-MON-yyyy HH24:MI:SS.FF5')"
+ ", TO_TIMESTAMP_TZ('27-MAR-2018 01:34:56.00789 -11:00', 'dd-MON-yyyy HH24:MI:SS.FF5 TZH:TZM')"
+ ", TO_TIMESTAMP_TZ('27-MAR-2018 01:34:56.00789', 'dd-MON-yyyy HH24:MI:SS.FF5')"
+ ", INTERVAL '-3-6' YEAR TO MONTH"
+ ", INTERVAL '-1 2:3:4.56' DAY TO SECOND"
+ ")");
connection.execute("COMMIT");
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) { private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
} }

View File

@ -7,26 +7,33 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig.SnapshotMode;
import io.debezium.connector.oracle.util.TestHelper; import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.util.Testing; import io.debezium.util.Testing;
/** /**
* Integration test to verify different Oracle datatypes. * Integration test to verify different Oracle datatypes as captured during initial snapshotting.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */
public class SnapshotDatatypesIT extends AbstractOracleDatatypesTest { public class SnapshotDatatypesIT extends AbstractOracleDatatypesTest {
@Rule public TestName name = new TestName();
@BeforeClass @BeforeClass
public static void beforeClass() throws SQLException { public static void beforeClass() throws SQLException {
createTables(); createTables();
insertStringTypes();
insertFpTypes();
insertIntTypes();
insertTimeTypes();
} }
@Before @Before
@ -36,14 +43,8 @@ public void before() throws Exception {
Testing.Debug.enable(); Testing.Debug.enable();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH); Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
String whitelistedTables = getAllTables().stream()
.map(t -> "ORCLPDB1." + t)
.map(t -> t.replaceAll("\\.", "\\\\."))
.collect(Collectors.joining(","));
Configuration config = TestHelper.defaultConfig() Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_WHITELIST, whitelistedTables) .with(OracleConnectorConfig.TABLE_WHITELIST, getTableWhitelist())
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.build(); .build();
start(OracleConnector.class, config); start(OracleConnector.class, config);
@ -51,4 +52,24 @@ public void before() throws Exception {
Thread.sleep(2000); Thread.sleep(2000);
} }
private String getTableWhitelist() {
switch(name.getMethodName()) {
case "stringTypes":
return "ORCLPDB1.debezium.type_string";
case "fpTypes":
return "ORCLPDB1.debezium.type_fp";
case "intTypes":
return "ORCLPDB1.debezium.type_int";
case "timeTypes":
return "ORCLPDB1.debezium.type_time";
default:
throw new IllegalArgumentException("Unexpected test method: " + name.getMethodName());
}
}
@Override
protected boolean insertRecordsDuringTest() {
return false;
}
} }

View File

@ -16,7 +16,7 @@
import io.debezium.util.Testing; import io.debezium.util.Testing;
/** /**
* Integration test to verify different Oracle datatypes. * Integration test to verify different Oracle datatypes as captured during streaming.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */
@ -47,4 +47,9 @@ public void before() throws Exception {
Thread.sleep(2000); Thread.sleep(2000);
createTables(); createTables();
} }
@Override
protected boolean insertRecordsDuringTest() {
return true;
}
} }