diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 064e14543..6f903bde5 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -70,6 +70,7 @@ Jan Hendrik Dolling Hans-Peter Grahsl Henryk Konsek Horia Chiorean +Hossein Torabi Ian Axelrod Igor Gabaydulin Ilia Bogdanov diff --git a/debezium-connector-mysql/.attach_pid364481 b/debezium-connector-mysql/.attach_pid364481 deleted file mode 100644 index e69de29bb..000000000 diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index aaa9e486a..f10b53460 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -58,7 +58,6 @@ import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.NullEventDataDeserializer; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import com.github.shyiko.mysql.binlog.network.AuthenticationException; import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory; @@ -270,44 +269,21 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { // Add our custom deserializers ... eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer()); eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer()); - - Set skippedOperations = context.getConnectorConfig().getSkippedOps(); - if (skippedOperations.contains(Operation.CREATE.code())) { - eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, new NullEventDataDeserializer()); - eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new NullEventDataDeserializer()); - } - else { - eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, - new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)); - eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, - new RowDeserializers.WriteRowsDeserializer( - tableMapEventByTableId).setMayContainExtraInformation(true)); - } - - if (skippedOperations.contains(Operation.UPDATE.code())) { - eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, new NullEventDataDeserializer()); - eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new NullEventDataDeserializer()); - } - else { - eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, - new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)); - eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, - new RowDeserializers.UpdateRowsDeserializer( - tableMapEventByTableId).setMayContainExtraInformation(true)); - } - - if (skippedOperations.contains(Operation.DELETE.code())) { - eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, new NullEventDataDeserializer()); - eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new NullEventDataDeserializer()); - } - else { - eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, - new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)); - eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, - new RowDeserializers.DeleteRowsDeserializer( - tableMapEventByTableId).setMayContainExtraInformation(true)); - } - + eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, + new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)); + eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, + new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)); + eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, + new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)); + eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, + new RowDeserializers.WriteRowsDeserializer( + tableMapEventByTableId).setMayContainExtraInformation(true)); + eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, + new RowDeserializers.UpdateRowsDeserializer( + tableMapEventByTableId).setMayContainExtraInformation(true)); + eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, + new RowDeserializers.DeleteRowsDeserializer( + tableMapEventByTableId).setMayContainExtraInformation(true)); client.setEventDeserializer(eventDeserializer); // Set up for JMX ... @@ -329,6 +305,7 @@ public void doDestroy() { @Override protected void doStart() { context.dbSchema().assureNonEmptySchema(); + Set skippedOperations = context.getConnectorConfig().getSkippedOps(); // Register our event handlers ... eventHandlers.put(EventType.STOP, this::handleServerStop); @@ -337,12 +314,22 @@ protected void doStart() { eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent); eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata); eventHandlers.put(EventType.QUERY, this::handleQueryEvent); - eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert); - eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate); - eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete); - eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert); - eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate); - eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete); + + if (!skippedOperations.contains(Operation.CREATE)) { + eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert); + eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert); + } + + if (!skippedOperations.contains(Operation.UPDATE)) { + eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate); + eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate); + } + + if (!skippedOperations.contains(Operation.DELETE)) { + eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete); + eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete); + } + eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange); eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction); eventHandlers.put(EventType.XID, this::handleTransactionCompletion); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index da8a81435..ad4e87f59 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -836,6 +836,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) { .withDescription("The criteria for running a snapshot upon startup of the connector. " + "Options include: " + "'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; " + + "'schema_only' to only take a snapshot of the schema (table structures) but no actual data; " + "'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; " + "'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally read the binlog; and" + "'never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the beginning of the binlog. " diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java index cb3e64679..852591fec 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java @@ -440,45 +440,6 @@ public void shouldHandleMySQLTimeCorrectly() throws Exception { assertThat(c5Time).isEqualTo(Duration.ofHours(-838).minusMinutes(59).minusSeconds(58).minusNanos(999999000)); } - @Test - public void shouldNotConsumeAllEventsFromDatabaseWithSkippedOperations() throws Exception { - // Define configuration that will ignore all events from MySQL source. - config = simpleConfig() - .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c") - .with(MySqlConnectorConfig.SNAPSHOT_MODE, "never") - .build(); - Filters filters = new Filters.Builder(config).build(); - context = new MySqlTaskContext(config, filters); - context.start(); - context.initializeHistory(); - reader = new BinlogReader("binlog", context, new AcceptAllPredicate()); - - // Start reading the binlog ... - reader.start(); - - try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) { - try (JdbcConnection connection = db.connect()) { - final Connection jdbc = connection.connection(); - connection.setAutoCommit(false); - final Statement statement = jdbc.createStatement(); - statement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')"); - jdbc.setSavepoint(); - statement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')"); - jdbc.commit(); - connection.query("SELECT * FROM customers", rs -> { - if (Testing.Print.isEnabled()) { - connection.print(rs); - } - }); - connection.setAutoCommit(true); - } - } - - Collection customers = store.collection(DATABASE.getDatabaseName(), "customers"); - assertThat(customers.numberOfTombstones()).isEqualTo(2); - - } - @Test(expected = ConnectException.class) public void shouldFailOnSchemaInconsistency() throws Exception { inconsistentSchema(null); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 6273144a9..7432590c8 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -2114,4 +2114,79 @@ public void shouldEmitHeadersOnPrimaryKeyUpdate() throws Exception { stopConnector(); } + + @Test + @FixFor("DBZ-1895") + public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception { + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c") + .build(); + + // Start the connector ... + start(MySqlConnector.class, config); + waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName()); + + try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) { + try (JdbcConnection connection = db.connect()) { + connection.execute("INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);"); + connection.execute("UPDATE products SET weight=3.13 WHERE name = 'rubberduck'"); + connection.execute("INSERT INTO products VALUES (202,'rubbercrocodile','Rubber Crocodile',4.14);"); + connection.execute("DELETE FROM products WHERE name = 'rubberduck'"); + connection.execute("INSERT INTO products VALUES (203,'rubberfish','Rubber Fish',5.15);"); + connection.execute("DELETE FROM products WHERE name = 'rubbercrocodile'"); + connection.execute("DELETE FROM products WHERE name = 'rubberfish'"); + } + } + + SourceRecords records = consumeRecordsByTopic(7); + List changeEvents = records.recordsForTopic(DATABASE.topicForTable("products")); + + assertUpdate(changeEvents.get(0), "id", 201); + assertDelete(changeEvents.get(1), "id", 201); + assertTombstone(changeEvents.get(2), "id", 201); + assertDelete(changeEvents.get(3), "id", 202); + assertTombstone(changeEvents.get(4), "id", 202); + assertDelete(changeEvents.get(5), "id", 203); + assertTombstone(changeEvents.get(6), "id", 203); + assertThat(changeEvents.size()).isEqualTo(7); + + stopConnector(); + } + + @Test + @FixFor("DBZ-1895") + public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Exception { + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.TOMBSTONES_ON_DELETE, false) + .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "u,d") + .build(); + + // Start the connector ... + start(MySqlConnector.class, config); + waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName()); + + try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) { + try (JdbcConnection connection = db.connect()) { + connection.execute("INSERT INTO products VALUES (204,'rubberduck','Rubber Duck',2.12);"); + connection.execute("UPDATE products SET weight=3.13 WHERE name = 'rubberduck'"); + connection.execute("INSERT INTO products VALUES (205,'rubbercrocodile','Rubber Crocodile',4.14);"); + connection.execute("DELETE FROM products WHERE name = 'rubberduck'"); + connection.execute("INSERT INTO products VALUES (206,'rubberfish','Rubber Fish',5.15);"); + } + } + + SourceRecords records = consumeRecordsByTopic(3); + List changeEvents = records.recordsForTopic(DATABASE.topicForTable("products")); + + assertInsert(changeEvents.get(0), "id", 204); + assertInsert(changeEvents.get(1), "id", 205); + assertInsert(changeEvents.get(2), "id", 206); + assertThat(changeEvents.size()).isEqualTo(3); + + stopConnector(); + } } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 261fee6da..a99a30ace 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -420,7 +420,7 @@ private static int validateSkippedOperation(Configuration config, Field field, V for (String operation : operations.split(",")) { switch (operation.trim()) { case "r": - case "i": + case "c": case "u": case "d": continue;