DBZ-1531 added headers for primary key update events to reference the original key

This commit is contained in:
rk3rn3r 2020-03-16 15:01:38 +01:00 committed by Gunnar Morling
parent fbd8b94e94
commit 6310e00675
4 changed files with 48 additions and 25 deletions

View File

@ -269,9 +269,9 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum
throws InterruptedException {
int count = 0;
validateColumnCount(tableSchema, after);
Object key = tableSchema.keyFromColumnData(after);
Object newkey = tableSchema.keyFromColumnData(after);
Struct valueAfter = tableSchema.valueFromColumnData(after);
if (valueAfter != null || key != null) {
if (valueAfter != null || newkey != null) {
Object oldKey = tableSchema.keyFromColumnData(before);
Struct valueBefore = tableSchema.valueFromColumnData(before);
Schema keySchema = tableSchema.keySchema();
@ -279,13 +279,13 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum
Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);
source.tableEvent(id);
Struct origin = source.struct();
if (key != null && !Objects.equals(key, oldKey)) {
if (newkey != null && !Objects.equals(newkey, oldKey)) {
// The key has changed, so we need to deal with both the new key and old key.
// Consumers may push the events into a system that won't allow both records to exist at the same time,
// so we first want to send the delete event for the old key...
ConnectHeaders headers = new ConnectHeaders();
headers.add(RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD, oldKey, keySchema);
headers.add(RelationalChangeRecordEmitter.PK_UPDATE_NEWKEY_FIELD, newkey, keySchema);
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts), null, headers);
@ -299,16 +299,19 @@ record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, p
++count;
}
headers = new ConnectHeaders();
headers.add(RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD, oldKey, keySchema);
// And finally send the create event ...
record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts), null, headers);
keySchema, newkey, envelope.schema(), envelope.create(valueAfter, origin, ts), null, headers);
consumer.accept(record);
++count;
}
else {
// The key has not changed, so a simple update is fine ...
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts));
keySchema, newkey, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts));
consumer.accept(record);
++count;
}

View File

@ -293,9 +293,17 @@ public void shouldValidateLockingModeNoneWithValidSnapshotModeConfiguration() {
}
}
private Header getPKUpdateNewKeyHeader(SourceRecord record) {
return this.getHeaderField(record, RelationalChangeRecordEmitter.PK_UPDATE_NEWKEY_FIELD);
}
private Header getPKUpdateOldKeyHeader(SourceRecord record) {
return this.getHeaderField(record, RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD);
}
private Header getHeaderField(SourceRecord record, String fieldName) {
return StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD.equals(header.key()))
.filter(header -> fieldName.equals(header.key()))
.collect(Collectors.toList()).get(0);
}
@ -449,8 +457,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
assertDelete(deleteRecord, "id", 1001);
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(1001), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("id"));
Header keyPKUpdateHeader = getPKUpdateNewKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(2001), ((Struct) keyPKUpdateHeader.value()).getInt32("id"));
assertTombstone(updates.get(1), "id", 1001);
@ -458,8 +466,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
assertInsert(insertRecord, "id", 2001);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1001), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("id"));
keyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1001), ((Struct) keyPKUpdateHeader.value()).getInt32("id"));
Testing.print("*** Done with PK change");
@ -2036,13 +2044,13 @@ public void shouldEmitOldkeyHeaderOnPrimaryKeyUpdate() throws Exception {
SourceRecord deleteRecord = updates.get(0);
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(10003), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("order_number"));
Header keyPKUpdateHeader = getPKUpdateNewKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(10303), ((Struct) keyPKUpdateHeader.value()).getInt32("order_number"));
SourceRecord insertRecord = updates.get(2);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(10003), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("order_number"));
keyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(10003), ((Struct) keyPKUpdateHeader.value()).getInt32("order_number"));
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {

View File

@ -689,9 +689,17 @@ public void shouldReceiveChangesForUpdatesWithColumnChanges() throws Exception {
Collections.singletonList(new SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 2)), updatedRecord, Envelope.FieldName.AFTER);
}
private Header getPKUpdateNewKeyHeader(SourceRecord record) {
return this.getHeaderField(record, RelationalChangeRecordEmitter.PK_UPDATE_NEWKEY_FIELD);
}
private Header getPKUpdateOldKeyHeader(SourceRecord record) {
return this.getHeaderField(record, RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD);
}
private Header getHeaderField(SourceRecord record, String fieldName) {
return StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD.equals(header.key()))
.filter(header -> fieldName.equals(header.key()))
.collect(Collectors.toList()).get(0);
}
@ -710,8 +718,8 @@ public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
Header keyPKUpdateHeader = getPKUpdateNewKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(2), ((Struct) keyPKUpdateHeader.value()).getInt32("pk"));
// followed by a tombstone of the old pk
SourceRecord tombstoneRecord = consumer.remove();
@ -724,8 +732,8 @@ public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
keyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1), ((Struct) keyPKUpdateHeader.value()).getInt32("pk"));
}
@Test
@ -746,8 +754,8 @@ public void shouldReceiveChangesForUpdatesWithPKChangesWithoutTombstone() throws
VerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1);
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
Header keyPKUpdateHeader = getPKUpdateNewKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(2), ((Struct) keyPKUpdateHeader.value()).getInt32("pk"));
// followed by insert of the new value
SourceRecord insertRecord = consumer.remove();
@ -755,8 +763,8 @@ public void shouldReceiveChangesForUpdatesWithPKChangesWithoutTombstone() throws
VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
keyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1), ((Struct) keyPKUpdateHeader.value()).getInt32("pk"));
}
@Test

View File

@ -27,6 +27,7 @@
public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> {
public static final String PK_UPDATE_OLDKEY_FIELD = "oldkey";
public static final String PK_UPDATE_NEWKEY_FIELD = "newkey";
protected final Logger logger = LoggerFactory.getLogger(getClass());
@ -108,11 +109,14 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
// PK update -> emit as delete and re-insert with new key
else {
ConnectHeaders headers = new ConnectHeaders();
headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
headers = new ConnectHeaders();
headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
}