DBZ-1895 Completing implementation;

* Restoring original derserializers; excluding skipped events in event handlers
* Adding test
This commit is contained in:
Gunnar Morling 2020-04-21 15:22:20 +02:00 committed by Jiri Pechanec
parent 9ddd893074
commit 0998fc533a
7 changed files with 110 additions and 85 deletions

View File

@ -70,6 +70,7 @@ Jan Hendrik Dolling
Hans-Peter Grahsl
Henryk Konsek
Horia Chiorean
Hossein Torabi
Ian Axelrod
Igor Gabaydulin
Ilia Bogdanov

View File

@ -58,7 +58,6 @@
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.NullEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
@ -270,44 +269,21 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
Set<Operation> skippedOperations = context.getConnectorConfig().getSkippedOps();
if (skippedOperations.contains(Operation.CREATE.code())) {
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new NullEventDataDeserializer());
}
else {
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
}
if (skippedOperations.contains(Operation.UPDATE.code())) {
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new NullEventDataDeserializer());
}
else {
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
}
if (skippedOperations.contains(Operation.DELETE.code())) {
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new NullEventDataDeserializer());
}
else {
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
}
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
client.setEventDeserializer(eventDeserializer);
// Set up for JMX ...
@ -329,6 +305,7 @@ public void doDestroy() {
@Override
protected void doStart() {
context.dbSchema().assureNonEmptySchema();
Set<Operation> skippedOperations = context.getConnectorConfig().getSkippedOps();
// Register our event handlers ...
eventHandlers.put(EventType.STOP, this::handleServerStop);
@ -337,12 +314,22 @@ protected void doStart() {
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
if (!skippedOperations.contains(Operation.CREATE)) {
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
}
if (!skippedOperations.contains(Operation.UPDATE)) {
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
}
if (!skippedOperations.contains(Operation.DELETE)) {
eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
}
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
eventHandlers.put(EventType.XID, this::handleTransactionCompletion);

View File

@ -836,6 +836,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
.withDescription("The criteria for running a snapshot upon startup of the connector. "
+ "Options include: "
+ "'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; "
+ "'schema_only' to only take a snapshot of the schema (table structures) but no actual data; "
+ "'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; "
+ "'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally read the binlog; and"
+ "'never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the beginning of the binlog. "

View File

@ -440,45 +440,6 @@ public void shouldHandleMySQLTimeCorrectly() throws Exception {
assertThat(c5Time).isEqualTo(Duration.ofHours(-838).minusMinutes(59).minusSeconds(58).minusNanos(999999000));
}
@Test
public void shouldNotConsumeAllEventsFromDatabaseWithSkippedOperations() throws Exception {
// Define configuration that will ignore all events from MySQL source.
config = simpleConfig()
.with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c")
.with(MySqlConnectorConfig.SNAPSHOT_MODE, "never")
.build();
Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.initializeHistory();
reader = new BinlogReader("binlog", context, new AcceptAllPredicate());
// Start reading the binlog ...
reader.start();
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
final Connection jdbc = connection.connection();
connection.setAutoCommit(false);
final Statement statement = jdbc.createStatement();
statement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')");
jdbc.setSavepoint();
statement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')");
jdbc.commit();
connection.query("SELECT * FROM customers", rs -> {
if (Testing.Print.isEnabled()) {
connection.print(rs);
}
});
connection.setAutoCommit(true);
}
}
Collection customers = store.collection(DATABASE.getDatabaseName(), "customers");
assertThat(customers.numberOfTombstones()).isEqualTo(2);
}
@Test(expected = ConnectException.class)
public void shouldFailOnSchemaInconsistency() throws Exception {
inconsistentSchema(null);

View File

@ -2114,4 +2114,79 @@ public void shouldEmitHeadersOnPrimaryKeyUpdate() throws Exception {
stopConnector();
}
@Test
@FixFor("DBZ-1895")
public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c")
.build();
// Start the connector ...
start(MySqlConnector.class, config);
waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName());
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);");
connection.execute("UPDATE products SET weight=3.13 WHERE name = 'rubberduck'");
connection.execute("INSERT INTO products VALUES (202,'rubbercrocodile','Rubber Crocodile',4.14);");
connection.execute("DELETE FROM products WHERE name = 'rubberduck'");
connection.execute("INSERT INTO products VALUES (203,'rubberfish','Rubber Fish',5.15);");
connection.execute("DELETE FROM products WHERE name = 'rubbercrocodile'");
connection.execute("DELETE FROM products WHERE name = 'rubberfish'");
}
}
SourceRecords records = consumeRecordsByTopic(7);
List<SourceRecord> changeEvents = records.recordsForTopic(DATABASE.topicForTable("products"));
assertUpdate(changeEvents.get(0), "id", 201);
assertDelete(changeEvents.get(1), "id", 201);
assertTombstone(changeEvents.get(2), "id", 201);
assertDelete(changeEvents.get(3), "id", 202);
assertTombstone(changeEvents.get(4), "id", 202);
assertDelete(changeEvents.get(5), "id", 203);
assertTombstone(changeEvents.get(6), "id", 203);
assertThat(changeEvents.size()).isEqualTo(7);
stopConnector();
}
@Test
@FixFor("DBZ-1895")
public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Exception {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "u,d")
.build();
// Start the connector ...
start(MySqlConnector.class, config);
waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName());
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("INSERT INTO products VALUES (204,'rubberduck','Rubber Duck',2.12);");
connection.execute("UPDATE products SET weight=3.13 WHERE name = 'rubberduck'");
connection.execute("INSERT INTO products VALUES (205,'rubbercrocodile','Rubber Crocodile',4.14);");
connection.execute("DELETE FROM products WHERE name = 'rubberduck'");
connection.execute("INSERT INTO products VALUES (206,'rubberfish','Rubber Fish',5.15);");
}
}
SourceRecords records = consumeRecordsByTopic(3);
List<SourceRecord> changeEvents = records.recordsForTopic(DATABASE.topicForTable("products"));
assertInsert(changeEvents.get(0), "id", 204);
assertInsert(changeEvents.get(1), "id", 205);
assertInsert(changeEvents.get(2), "id", 206);
assertThat(changeEvents.size()).isEqualTo(3);
stopConnector();
}
}

View File

@ -420,7 +420,7 @@ private static int validateSkippedOperation(Configuration config, Field field, V
for (String operation : operations.split(",")) {
switch (operation.trim()) {
case "r":
case "i":
case "c":
case "u":
case "d":
continue;