DBZ-835 Emit tombstone optionally

This commit is contained in:
Jiri Pechanec 2019-06-12 16:09:46 +02:00 committed by Gunnar Morling
parent 5fdb0ca81b
commit 9b5640ddde
2 changed files with 52 additions and 3 deletions

View File

@ -145,6 +145,56 @@ public void createAndDelete() throws Exception {
stopConnector(); stopConnector();
} }
@Test
public void deleteWithoutTombstone() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TOMBSTONES_ON_DELETE, false)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')"
);
}
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
connection.execute("DELETE FROM tableB");
final SourceRecords deleteRecords = consumeRecordsByTopic(RECORDS_PER_TABLE);
final List<SourceRecord> deleteTableA = deleteRecords.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> deleteTableB = deleteRecords.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(deleteTableA).isNullOrEmpty();
Assertions.assertThat(deleteTableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord deleteRecord = deleteTableB.get(i);
final List<SchemaAndValueField> expectedDeleteRow = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct deleteKey = (Struct) deleteRecord.key();
final Struct deleteValue = (Struct) deleteRecord.value();
assertRecord((Struct) deleteValue.get("before"), expectedDeleteRow);
assertNull(deleteValue.get("after"));
}
stopConnector();
}
@Test @Test
public void update() throws Exception { public void update() throws Exception {
final int RECORDS_PER_TABLE = 5; final int RECORDS_PER_TABLE = 5;

View File

@ -53,6 +53,7 @@ public class EventDispatcher<T extends DataCollectionId> {
private final ChangeEventCreator changeEventCreator; private final ChangeEventCreator changeEventCreator;
private final Heartbeat heartbeat; private final Heartbeat heartbeat;
private DataChangeEventListener eventListener = DataChangeEventListener.NO_OP; private DataChangeEventListener eventListener = DataChangeEventListener.NO_OP;
private final boolean emitTombstonesOnDelete;
/** /**
* Change event receiver for events dispatched from a streaming change event source. * Change event receiver for events dispatched from a streaming change event source.
@ -71,6 +72,7 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
this.filter = filter; this.filter = filter;
this.changeEventCreator = changeEventCreator; this.changeEventCreator = changeEventCreator;
this.streamingReceiver = new StreamingChangeRecordReceiver(); this.streamingReceiver = new StreamingChangeRecordReceiver();
this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete();
heartbeat = Heartbeat.create(connectorConfig.getConfig(), topicSelector.getHeartbeatTopic(), heartbeat = Heartbeat.create(connectorConfig.getConfig(), topicSelector.getHeartbeatTopic(),
connectorConfig.getLogicalName()); connectorConfig.getLogicalName());
@ -187,9 +189,6 @@ public void changeRecord(DataCollectionSchema dataCollectionSchema, Operation op
queue.enqueue(changeEventCreator.createDataChangeEvent(record)); queue.enqueue(changeEventCreator.createDataChangeEvent(record));
// TODO handle option
boolean emitTombstonesOnDelete = true;
if (emitTombstonesOnDelete && operation == Operation.DELETE) { if (emitTombstonesOnDelete && operation == Operation.DELETE) {
SourceRecord tombStone = record.newRecord( SourceRecord tombStone = record.newRecord(
record.topic(), record.topic(),