DBZ-150 Changed the order of events when a row's key is changed.

This commit is contained in:
Randall Hauch 2016-11-09 14:32:46 -06:00
parent b0ded5f383
commit 8a52cda0dc
2 changed files with 13 additions and 11 deletions

View File

@ -238,22 +238,24 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum
Map<String, ?> offset = source.offsetForRow(rowNumber, numberOfRows);
Struct origin = source.struct();
if (key != null && !Objects.equals(key, oldKey)) {
// The key has indeed changed, so first send a create event ...
// The key has changed, so we need to deal with both the new key and old key.
// Consumers may push the events into a system that won't allow both records to exist at the same time,
// so we first want to send the delete event for the old key...
SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));
consumer.accept(record);
++count;
// then send a delete event for the old key ...
record = new SourceRecord(partition, offset, topicName, partitionNum,
keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts));
consumer.accept(record);
++count;
// Send a tombstone event for the old key ...
// Next send a tombstone event for the old key ...
record = new SourceRecord(partition, offset, topicName, partitionNum, keySchema, oldKey, null, null);
consumer.accept(record);
++count;
// And finally send the create event ...
record = new SourceRecord(partition, offset, topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));
consumer.accept(record);
++count;
} else {
// The key has not changed, so a simple update is fine ...
SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,

View File

@ -416,9 +416,9 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
records = consumeRecordsByTopic(3);
List<SourceRecord> updates = records.recordsForTopic("myServer.connector_test.products");
assertThat(updates.size()).isEqualTo(3);
assertInsert(updates.get(0), "id", 2001);
assertDelete(updates.get(1), "id", 1001);
assertTombstone(updates.get(2), "id", 1001);
assertDelete(updates.get(0), "id", 1001);
assertTombstone(updates.get(1), "id", 1001);
assertInsert(updates.get(2), "id", 2001);
Testing.print("*** Done with PK change");