DBZ-1531 added headers for primary key update events to reference the original key

This commit is contained in:
rk3rn3r 2020-03-14 00:29:57 +01:00 committed by Gunnar Morling
parent 06b11775fc
commit fbd8b94e94
6 changed files with 175 additions and 13 deletions

View File

@ -18,12 +18,14 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
@ -281,8 +283,12 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum
// The key has changed, so we need to deal with both the new key and old key.
// Consumers may push the events into a system that won't allow both records to exist at the same time,
// so we first want to send the delete event for the old key...
ConnectHeaders headers = new ConnectHeaders();
headers.add(RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD, oldKey, keySchema);
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts));
keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts), null, headers);
consumer.accept(record);
++count;
@ -295,7 +301,7 @@ record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, p
// And finally send the create event ...
record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));
keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts), null, headers);
consumer.accept(record);
++count;
}

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
@ -19,10 +20,12 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
@ -42,6 +45,7 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
@ -289,6 +293,12 @@ public void shouldValidateLockingModeNoneWithValidSnapshotModeConfiguration() {
}
}
private Header getPKUpdateOldKeyHeader(SourceRecord record) {
return StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD.equals(header.key()))
.collect(Collectors.toList()).get(0);
}
@Test
public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
String masterPort = System.getProperty("database.port", "3306");
@ -434,9 +444,22 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
records = consumeRecordsByTopic(3);
List<SourceRecord> updates = records.recordsForTopic(DATABASE.topicForTable("products"));
assertThat(updates.size()).isEqualTo(3);
assertDelete(updates.get(0), "id", 1001);
SourceRecord deleteRecord = updates.get(0);
assertDelete(deleteRecord, "id", 1001);
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(1001), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("id"));
assertTombstone(updates.get(1), "id", 1001);
assertInsert(updates.get(2), "id", 2001);
SourceRecord insertRecord = updates.get(2);
assertInsert(insertRecord, "id", 2001);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1001), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("id"));
Testing.print("*** Done with PK change");
@ -1983,4 +2006,57 @@ private List<SourceRecord> recordsForTopicForRoProductsTable(SourceRecords recor
final List<SourceRecord> uc = records.recordsForTopic(RO_DATABASE.topicForTable("Products"));
return uc != null ? uc : records.recordsForTopic(RO_DATABASE.topicForTable("products"));
}
@Test
@FixFor("DBZ-1531")
public void shouldEmitOldkeyHeaderOnPrimaryKeyUpdate() throws Exception {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(INITIAL_EVENT_COUNT); // 6 DDL changes
assertThat(records.recordsForTopic(DATABASE.topicForTable("orders")).size()).isEqualTo(5);
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE orders SET order_number=10303 WHERE order_number=10003");
}
}
// Consume the update of the PK, which is one insert followed by a delete followed by a tombstone ...
records = consumeRecordsByTopic(3);
List<SourceRecord> updates = records.recordsForTopic(DATABASE.topicForTable("orders"));
assertThat(updates.size()).isEqualTo(3);
SourceRecord deleteRecord = updates.get(0);
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(10003), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("order_number"));
SourceRecord insertRecord = updates.get(2);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(10003), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("order_number"));
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE orders SET quantity=5 WHERE order_number=10004");
}
}
records = consumeRecordsByTopic(5);
updates = records.recordsForTopic(DATABASE.topicForTable("orders"));
assertThat(updates.size()).isEqualTo(1);
SourceRecord updateRecord = updates.get(0);
assertEquals(0, updateRecord.headers().size()); // to be removed/updated once we set additional headers
stopConnector();
}
}

View File

@ -32,12 +32,14 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.awaitility.Awaitility;
@ -75,6 +77,7 @@
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.ShouldFailWhen;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -686,6 +689,12 @@ public void shouldReceiveChangesForUpdatesWithColumnChanges() throws Exception {
Collections.singletonList(new SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 2)), updatedRecord, Envelope.FieldName.AFTER);
}
private Header getPKUpdateOldKeyHeader(SourceRecord record) {
return StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> RelationalChangeRecordEmitter.PK_UPDATE_OLDKEY_FIELD.equals(header.key()))
.collect(Collectors.toList()).get(0);
}
@Test
public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
startConnector();
@ -699,6 +708,11 @@ public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
assertEquals(topicName, deleteRecord.topic());
VerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1);
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
// followed by a tombstone of the old pk
SourceRecord tombstoneRecord = consumer.remove();
assertEquals(topicName, tombstoneRecord.topic());
@ -708,6 +722,10 @@ public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
SourceRecord insertRecord = consumer.remove();
assertEquals(topicName, insertRecord.topic());
VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
}
@Test
@ -727,10 +745,18 @@ public void shouldReceiveChangesForUpdatesWithPKChangesWithoutTombstone() throws
assertEquals(topicName, deleteRecord.topic());
VerifyRecord.isValidDelete(deleteRecord, PK_FIELD, 1);
assertEquals(1, deleteRecord.headers().size()); // to be removed/updated once we set additional headers
Header oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(deleteRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
// followed by insert of the new value
SourceRecord insertRecord = consumer.remove();
assertEquals(topicName, insertRecord.topic());
VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2);
assertEquals(1, insertRecord.headers().size()); // to be removed/updated once we set additional headers
oldkeyPKUpdateHeader = getPKUpdateOldKeyHeader(insertRecord);
assertEquals(Integer.valueOf(1), ((Struct) oldkeyPKUpdateHeader.value()).getInt32("pk"));
}
@Test

View File

@ -12,6 +12,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -167,12 +168,24 @@ public boolean dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter c
changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {
@Override
public void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value,
public void changeRecord(DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset)
throws InterruptedException {
this.changeRecord(schema, operation, key, value, offset, null);
}
@Override
public void changeRecord(DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
transactionMonitor.dataEvent(dataCollectionId, offset, key, value);
eventListener.onEvent(dataCollectionId, offset, key, value);
streamingReceiver.changeRecord(schema, operation, key, value, offset);
streamingReceiver.changeRecord(schema, operation, key, value, offset, headers);
}
});
handled = true;
@ -268,8 +281,22 @@ public interface SnapshotReceiver extends ChangeRecordEmitter.Receiver {
private final class StreamingChangeRecordReceiver implements ChangeRecordEmitter.Receiver {
@Override
public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation operation, Object key, Struct value, OffsetContext offsetContext)
public void changeRecord(DataCollectionSchema dataCollectionSchema,
Operation operation,
Object key, Struct value,
OffsetContext offsetContext)
throws InterruptedException {
this.changeRecord(dataCollectionSchema, operation, key, value, offsetContext, null);
}
@Override
public void changeRecord(DataCollectionSchema dataCollectionSchema,
Operation operation,
Object key, Struct value,
OffsetContext offsetContext,
ConnectHeaders headers)
throws InterruptedException {
Objects.requireNonNull(value, "value must not be null");
LOGGER.trace("Received change record for {} operation on key {}", operation, key);
@ -277,8 +304,13 @@ public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation op
Schema keySchema = dataCollectionSchema.keySchema();
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
SourceRecord record = new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(),
topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), value);
SourceRecord record = new SourceRecord(offsetContext.getPartition(),
offsetContext.getOffset(), topicName, null,
keySchema, key,
dataCollectionSchema.getEnvelopeSchema().schema(),
value,
null,
headers);
queue.enqueue(changeEventCreator.createDataChangeEvent(record));
@ -290,7 +322,8 @@ public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation op
record.key(),
null, // value schema
null, // value
record.timestamp());
record.timestamp(),
record.headers());
queue.enqueue(changeEventCreator.createDataChangeEvent(tombStone));
}

View File

@ -6,6 +6,7 @@
package io.debezium.pipeline.spi;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import io.debezium.data.Envelope.Operation;
import io.debezium.schema.DataCollectionSchema;
@ -30,6 +31,20 @@ public interface ChangeRecordEmitter {
OffsetContext getOffset();
public interface Receiver {
void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException;
void changeRecord(DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset)
throws InterruptedException;
default void changeRecord(DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
throw new RuntimeException("Not implemented yet in " + this.getClass().getName());
}
}
}

View File

@ -8,6 +8,7 @@
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,6 +26,8 @@
*/
public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> {
public static final String PK_UPDATE_OLDKEY_FIELD = "oldkey";
protected final Logger logger = LoggerFactory.getLogger(getClass());
public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
@ -104,11 +107,14 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
}
// PK update -> emit as delete and re-insert with new key
else {
ConnectHeaders headers = new ConnectHeaders();
headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());
receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
}
}