DBZ-44 Generate a tombstone for old key when row's key is change

When a row is updated in the database and the primary/unique key for that table is changed, the MySQL connector continues to generate an update event with the new key and new value, but now also generates a tombstone event for the old key. This ensures that when a Kafka topic is compacted, all prior events with the old key will (eventually) be removed. It also ensures that consumers see that the row represented by the old key has been removed.
This commit is contained in:
Randall Hauch 2016-05-13 17:43:29 -05:00
parent 7c296b83d5
commit e6710a5300
3 changed files with 65 additions and 8 deletions

View File

@ -307,6 +307,17 @@ public void handleUpdate(Event event, SourceInfo source, Consumer<SourceRecord>
keySchema, key, valueSchema, value);
recorder.accept(record);
}
// Check whether the key for this record changed in the update ...
Object oldKey = converter.createKey(before, includedColumns);
if ( key != null && !Objects.equals(key, oldKey)) {
// The key has indeed changed, so also send a delete/tombstone event for the old key ...
value = converter.deleted(before, includedColumnsBefore);
if ( value == null ) valueSchema = null;
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, oldKey, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping update row event: {}", event);

View File

@ -9,10 +9,13 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -115,20 +118,35 @@ public void shouldStartAndPollShouldReturnSourceRecordsFromDatabase() throws SQL
Testing.Print.disable();
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("INSERT INTO products VALUES (default,'roy','old robot',1234.56);");
connection.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56);");
connection.query("SELECT * FROM products", rs->{if (Testing.Print.isEnabled()) connection.print(rs);});
}
}
// Restart the connector and wait for a few seconds (at most) for the new record ...
Testing.Print.enable();
//Testing.Print.enable();
start(MySqlConnector.class, config);
waitForAvailableRecords(5, TimeUnit.SECONDS);
totalConsumed += consumeAvailableRecords(this::print);
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE products SET id=2001, description='really old robot' WHERE id=1001");
connection.query("SELECT * FROM products", rs->{if (Testing.Print.isEnabled()) connection.print(rs);});
}
}
waitForAvailableRecords(5, TimeUnit.SECONDS);
List<SourceRecord> deletes = new ArrayList<>();
totalConsumed += consumeAvailableRecords(deletes::add);
stopConnector();
// We should have seen a total of 30 events, though when they appear may vary ...
assertThat(totalConsumed).isEqualTo(30);
// Verify that the update of a record where the pk changes results in an update and a delete event ...
assertThat(deletes.size()).isEqualTo(2);
assertInsert(deletes.get(0),"id",2001);
assertTombstone(deletes.get(1),"id",1001);
// We should have seen a total of 32 events, though when they appear may vary ...
assertThat(totalConsumed).isEqualTo(32);
}
@Test
@ -153,7 +171,7 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
start(MySqlConnector.class, config);
// Wait for records to become available ...
Testing.Print.enable();
//Testing.Print.enable();
waitForAvailableRecords(15, TimeUnit.SECONDS);
// Now consume the records ...

View File

@ -318,6 +318,35 @@ protected void assertNoRecordsToConsume() {
assertThat(consumedLines.isEmpty()).isTrue();
}
protected void assertKey(SourceRecord record, String pkField, int pk) {
Struct key = (Struct) record.key();
assertThat(key.get(pkField)).isEqualTo(pk);
}
protected void assertTombstone(SourceRecord record) {
assertThat(record.key()).isNotNull();
assertThat(record.keySchema()).isNotNull();
assertThat(record.value()).isNull();
assertThat(record.valueSchema()).isNull();
}
protected void assertTombstone(SourceRecord record, String pkField, int pk) {
assertKey(record,pkField,pk);
assertTombstone(record);
}
protected void assertInsert(SourceRecord record, String pkField, int pk) {
assertKey(record,pkField,pk);
assertThat(record.key()).isNotNull();
assertThat(record.keySchema()).isNotNull();
assertThat(record.value()).isNotNull();
assertThat(record.valueSchema()).isNotNull();
}
protected void assertUpdate(SourceRecord record, String pkField, int pk) {
assertInsert(record,pkField,pk); // currently the same as an insert
}
protected void print(SourceRecord record) {
StringBuilder sb = new StringBuilder("SourceRecord{");
sb.append("sourcePartition=").append(record.sourcePartition());
@ -399,8 +428,7 @@ protected void append(Object obj, StringBuilder sb) {
append(field.schema(), sb);
}
sb.append('}');
}
if (obj instanceof Struct) {
} else if (obj instanceof Struct) {
Struct s = (Struct) obj;
sb.append('{');
boolean first = true;