diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java index cd0853f7f..707c2527c 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java @@ -307,6 +307,17 @@ public void handleUpdate(Event event, SourceInfo source, Consumer keySchema, key, valueSchema, value); recorder.accept(record); } + + // Check whether the key for this record changed in the update ... + Object oldKey = converter.createKey(before, includedColumns); + if ( key != null && !Objects.equals(key, oldKey)) { + // The key has indeed changed, so also send a delete/tombstone event for the old key ... + value = converter.deleted(before, includedColumnsBefore); + if ( value == null ) valueSchema = null; + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, oldKey, valueSchema, value); + recorder.accept(record); + } } } else if (logger.isDebugEnabled()) { logger.debug("Skipping update row event: {}", event); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index bc4ed2b91..5370e9e45 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -9,10 +9,13 @@ import java.nio.file.Path; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -115,22 +118,37 @@ public void shouldStartAndPollShouldReturnSourceRecordsFromDatabase() throws SQL Testing.Print.disable(); try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) { try (JdbcConnection connection = db.connect()) { - connection.execute("INSERT INTO products VALUES (default,'roy','old robot',1234.56);"); + connection.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56);"); connection.query("SELECT * FROM products", rs->{if (Testing.Print.isEnabled()) connection.print(rs);}); } } // Restart the connector and wait for a few seconds (at most) for the new record ... - Testing.Print.enable(); + //Testing.Print.enable(); start(MySqlConnector.class, config); waitForAvailableRecords(5, TimeUnit.SECONDS); totalConsumed += consumeAvailableRecords(this::print); + + try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) { + try (JdbcConnection connection = db.connect()) { + connection.execute("UPDATE products SET id=2001, description='really old robot' WHERE id=1001"); + connection.query("SELECT * FROM products", rs->{if (Testing.Print.isEnabled()) connection.print(rs);}); + } + } + waitForAvailableRecords(5, TimeUnit.SECONDS); + List deletes = new ArrayList<>(); + totalConsumed += consumeAvailableRecords(deletes::add); stopConnector(); + + // Verify that the update of a record where the pk changes results in an update and a delete event ... + assertThat(deletes.size()).isEqualTo(2); + assertInsert(deletes.get(0),"id",2001); + assertTombstone(deletes.get(1),"id",1001); - // We should have seen a total of 30 events, though when they appear may vary ... - assertThat(totalConsumed).isEqualTo(30); + // We should have seen a total of 32 events, though when they appear may vary ... + assertThat(totalConsumed).isEqualTo(32); } - + @Test public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLException { Testing.Files.delete(DB_HISTORY_PATH); @@ -153,7 +171,7 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep start(MySqlConnector.class, config); // Wait for records to become available ... - Testing.Print.enable(); + //Testing.Print.enable(); waitForAvailableRecords(15, TimeUnit.SECONDS); // Now consume the records ... diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index e3f744108..030c580a1 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -318,6 +318,35 @@ protected void assertNoRecordsToConsume() { assertThat(consumedLines.isEmpty()).isTrue(); } + protected void assertKey(SourceRecord record, String pkField, int pk) { + Struct key = (Struct) record.key(); + assertThat(key.get(pkField)).isEqualTo(pk); + } + + protected void assertTombstone(SourceRecord record) { + assertThat(record.key()).isNotNull(); + assertThat(record.keySchema()).isNotNull(); + assertThat(record.value()).isNull(); + assertThat(record.valueSchema()).isNull(); + } + + protected void assertTombstone(SourceRecord record, String pkField, int pk) { + assertKey(record,pkField,pk); + assertTombstone(record); + } + + protected void assertInsert(SourceRecord record, String pkField, int pk) { + assertKey(record,pkField,pk); + assertThat(record.key()).isNotNull(); + assertThat(record.keySchema()).isNotNull(); + assertThat(record.value()).isNotNull(); + assertThat(record.valueSchema()).isNotNull(); + } + + protected void assertUpdate(SourceRecord record, String pkField, int pk) { + assertInsert(record,pkField,pk); // currently the same as an insert + } + protected void print(SourceRecord record) { StringBuilder sb = new StringBuilder("SourceRecord{"); sb.append("sourcePartition=").append(record.sourcePartition()); @@ -399,8 +428,7 @@ protected void append(Object obj, StringBuilder sb) { append(field.schema(), sb); } sb.append('}'); - } - if (obj instanceof Struct) { + } else if (obj instanceof Struct) { Struct s = (Struct) obj; sb.append('{'); boolean first = true;