DBZ-1184 Add test
This commit is contained in:
parent
41734d322e
commit
a892a4eb6f
@ -206,8 +206,8 @@ public GtidSet purgedGtidSet() {
|
||||
AtomicReference<String> gtidSetStr = new AtomicReference<String>();
|
||||
try {
|
||||
jdbc.query("SELECT @@global.gtid_purged", rs -> {
|
||||
if (rs.next() && rs.getMetaData().getColumnCount() > 1) {
|
||||
gtidSetStr.set(rs.getString(2));// GTID set, may be null, blank, or contain a GTID set
|
||||
if (rs.next() && rs.getMetaData().getColumnCount() > 0) {
|
||||
gtidSetStr.set(rs.getString(1));// GTID set, may be null, blank, or contain a GTID set
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -16,12 +16,15 @@
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.common.config.Config;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -997,6 +1000,84 @@ private Struct getAfter(SourceRecord record) {
|
||||
return (Struct)((Struct)record.value()).get("after");
|
||||
}
|
||||
|
||||
private boolean isGtidModeEnabled() throws SQLException {
|
||||
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
|
||||
return MySQLConnection.forTestDatabase(DATABASE.getDatabaseName()).queryAndMap(
|
||||
"SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'",
|
||||
rs -> {
|
||||
if (rs.next()) {
|
||||
return !"OFF".equalsIgnoreCase(rs.getString(2));
|
||||
}
|
||||
throw new IllegalStateException("Cannot obtain GTID status");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1184")
|
||||
public void shouldProcessPurgedGtidSet() throws SQLException, InterruptedException {
|
||||
Testing.Files.delete(DB_HISTORY_PATH);
|
||||
|
||||
if (!isGtidModeEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Use the DB configuration to define the connector's configuration ...
|
||||
config = RO_DATABASE.defaultConfig()
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
|
||||
.build();
|
||||
|
||||
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
|
||||
try (JdbcConnection connection = db.connect()) {
|
||||
connection.execute("FLUSH LOGS");
|
||||
connection.execute("PURGE BINARY LOGS TO 'mysql-bin.000004'");
|
||||
}
|
||||
}
|
||||
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
|
||||
// Consume the first records due to startup and initialization of the database ...
|
||||
// Testing.Print.enable();
|
||||
SourceRecords records = consumeRecordsByTopic(INITIAL_EVENT_COUNT); // 6 DDL changes
|
||||
assertThat(recordsForTopicForRoProductsTable(records).size()).isEqualTo(9);
|
||||
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
|
||||
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
|
||||
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("orders")).size()).isEqualTo(5);
|
||||
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("Products")).size()).isEqualTo(9);
|
||||
assertThat(records.topics().size()).isEqualTo(4 + 1);
|
||||
assertThat(records.ddlRecordsForDatabase(RO_DATABASE.getDatabaseName()).size()).isEqualTo(6);
|
||||
|
||||
// check float value
|
||||
Optional<SourceRecord> recordWithScientfic = records.recordsForTopic(RO_DATABASE.topicForTable("Products")).stream().filter(x -> "hammer2".equals(getAfter(x).get("name"))).findFirst();
|
||||
assertThat(recordWithScientfic.isPresent());
|
||||
assertThat(getAfter(recordWithScientfic.get()).get("weight")).isEqualTo(0.875);
|
||||
|
||||
// Check that all records are valid, can be serialized and deserialized ...
|
||||
records.forEach(this::validate);
|
||||
|
||||
// Check that records have GTID that does not contain purged interval
|
||||
records.recordsForTopic(RO_DATABASE.topicForTable("customers")).forEach(record -> {
|
||||
final String gtids = (String)record.sourceOffset().get("gtids");
|
||||
final Pattern p = Pattern.compile(".*(.*):(.*)-(.*).*");
|
||||
final Matcher m = p.matcher(gtids);
|
||||
m.matches();
|
||||
Assertions.assertThat(m.group(2)).isNotEqualTo("1");
|
||||
});
|
||||
|
||||
// More records may have been written (if this method were run after the others), but we don't care ...
|
||||
stopConnector();
|
||||
|
||||
records.recordsForTopic(RO_DATABASE.topicForTable("orders")).forEach(record -> {
|
||||
print(record);
|
||||
});
|
||||
|
||||
records.recordsForTopic(RO_DATABASE.topicForTable("customers")).forEach(record -> {
|
||||
print(record);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConsumeEventsWithNoSnapshot() throws SQLException, InterruptedException {
|
||||
Testing.Files.delete(DB_HISTORY_PATH);
|
||||
|
Loading…
Reference in New Issue
Block a user