DBZ-1184 Simplify the test

This commit is contained in:
Jiri Pechanec 2019-03-15 16:31:24 +01:00 committed by Gunnar Morling
parent a892a4eb6f
commit 08bc66f8ef

View File

@ -1019,46 +1019,44 @@ public void shouldProcessPurgedGtidSet() throws SQLException, InterruptedExcepti
Testing.Files.delete(DB_HISTORY_PATH);
if (!isGtidModeEnabled()) {
logger.warn("GTID is not enabled, skipping shouldProcessPurgedGtidSet");
return;
}
// Use the DB configuration to define the connector's configuration ...
config = RO_DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("FLUSH LOGS");
connection.execute("PURGE BINARY LOGS TO 'mysql-bin.000004'");
}
}
final UniqueDatabase database = new UniqueDatabase("myServer1", "connector_test")
.withDbHistoryPath(DB_HISTORY_PATH);
final UniqueDatabase ro_database = new UniqueDatabase("myServer2", "connector_test_ro", database)
.withDbHistoryPath(DB_HISTORY_PATH);
ro_database.createAndInitialize();
// Use the DB configuration to define the connector's configuration ...
config = ro_database.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.TABLE_WHITELIST, ro_database.qualifiedTableName("customers"))
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(INITIAL_EVENT_COUNT); // 6 DDL changes
assertThat(recordsForTopicForRoProductsTable(records).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("orders")).size()).isEqualTo(5);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("Products")).size()).isEqualTo(9);
assertThat(records.topics().size()).isEqualTo(4 + 1);
assertThat(records.ddlRecordsForDatabase(RO_DATABASE.getDatabaseName()).size()).isEqualTo(6);
// check float value
Optional<SourceRecord> recordWithScientfic = records.recordsForTopic(RO_DATABASE.topicForTable("Products")).stream().filter(x -> "hammer2".equals(getAfter(x).get("name"))).findFirst();
assertThat(recordWithScientfic.isPresent());
assertThat(getAfter(recordWithScientfic.get()).get("weight")).isEqualTo(0.875);
SourceRecords records = consumeRecordsByTopic(6 + 4); // 6 DDL changes
assertThat(records.recordsForTopic(ro_database.topicForTable("customers")).size()).isEqualTo(4);
assertThat(records.topics().size()).isEqualTo(1 + 1);
assertThat(records.ddlRecordsForDatabase(ro_database.getDatabaseName()).size()).isEqualTo(6);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
// Check that records have GTID that does not contain purged interval
records.recordsForTopic(RO_DATABASE.topicForTable("customers")).forEach(record -> {
records.recordsForTopic(ro_database.topicForTable("customers")).forEach(record -> {
final String gtids = (String)record.sourceOffset().get("gtids");
final Pattern p = Pattern.compile(".*(.*):(.*)-(.*).*");
final Matcher m = p.matcher(gtids);
@ -1066,16 +1064,7 @@ public void shouldProcessPurgedGtidSet() throws SQLException, InterruptedExcepti
Assertions.assertThat(m.group(2)).isNotEqualTo("1");
});
// More records may have been written (if this method were run after the others), but we don't care ...
stopConnector();
records.recordsForTopic(RO_DATABASE.topicForTable("orders")).forEach(record -> {
print(record);
});
records.recordsForTopic(RO_DATABASE.topicForTable("customers")).forEach(record -> {
print(record);
});
}
@Test