DBZ-1024 Emit correct last_snapshot_record in source

This commit is contained in:
Jiri Pechanec 2018-12-06 15:03:02 +01:00 committed by Gunnar Morling
parent 4ba3213338
commit 0b78f0c31a
4 changed files with 36 additions and 13 deletions

View File

@ -203,6 +203,7 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
// process and send the last record after marking it as such
logger.info("Step 5: sending the last snapshot record");
sourceInfo.markLastSnapshotRecord();
changeSourceToLastSnapshotRecord(currentRecord);
this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), sourceInfo.offset(),
currentRecord.topic(), currentRecord.kafkaPartition(),
currentRecord.keySchema(), currentRecord.key(),
@ -240,6 +241,14 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
}
}
private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
final Struct envelope = (Struct)currentRecord.value();
final Struct source = (Struct)envelope.get("source");
if (source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY) != null) {
source.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, true);
}
}
private void rollbackTransaction(Connection jdbcConnection) {
try {
if (jdbcConnection != null) {

View File

@ -610,13 +610,14 @@ protected void assertRecordSchemaAndValues(List<SchemaAndValueField> expectedSch
);
}
protected void assertRecordOffset(SourceRecord record, boolean expectSnapshot, boolean expectedLastSnapshotRecord) {
protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean expectSnapshot, boolean expectedLastSnapshotRecord) {
Map<String, ?> offset = record.sourceOffset();
assertNotNull(offset.get(SourceInfo.TXID_KEY));
assertNotNull(offset.get(SourceInfo.TIMESTAMP_KEY));
assertNotNull(offset.get(SourceInfo.LSN_KEY));
Object snapshot = offset.get(SourceInfo.SNAPSHOT_KEY);
Object lastSnapshotRecord = offset.get(SourceInfo.LAST_SNAPSHOT_RECORD_KEY);
if (expectSnapshot) {
Assert.assertTrue("Snapshot marker expected but not found", (Boolean) snapshot);
assertEquals("Last snapshot record marker mismatch", expectedLastSnapshotRecord, lastSnapshotRecord);
@ -625,6 +626,19 @@ protected void assertRecordOffset(SourceRecord record, boolean expectSnapshot, b
assertNull("Snapshot marker not expected, but found", snapshot);
assertNull("Last snapshot marker not expected, but found", lastSnapshotRecord);
}
final Struct envelope = (Struct)record.value();
if (envelope != null) {
final Struct source = (Struct)envelope.get("source");
final Boolean sourceSnapshot = source.getBoolean(SourceInfo.SNAPSHOT_KEY);
final Boolean sourceLastSnapshotRecord = source.getBoolean(SourceInfo.LAST_SNAPSHOT_RECORD_KEY);
if (expectSnapshot) {
Assert.assertTrue("Snapshot marker expected in source but not found", sourceSnapshot);
}
else {
assertNull("Source snapshot marker not expected, but found", sourceSnapshot);
assertNull("Source last snapshot marker not expected, but found", sourceLastSnapshotRecord);
}
}
}
protected void assertSourceInfo(SourceRecord record, String db, String schema, String table) {

View File

@ -87,7 +87,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffset(record, true, consumer.isEmpty());
assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty());
assertSourceInfo(record);
}
}
@ -159,13 +159,13 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
SourceRecord first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 2);
assertEquals(topicName("s1.a"), first.topic());
assertRecordOffset(first, false, false);
assertRecordOffsetAndSnapshotSource(first, false, false);
assertSourceInfo(first, "test_database", "s1", "a");
SourceRecord second = consumer.remove();
VerifyRecord.isValidInsert(second, PK_FIELD, 2);
assertEquals(topicName("s2.a"), second.topic());
assertRecordOffset(second, false, false);
assertRecordOffsetAndSnapshotSource(second, false, false);
assertSourceInfo(second, "test_database", "s2", "a");
// now shut down the producers and insert some more records
@ -184,7 +184,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
int counterVal = counter.getAndIncrement();
int expectedPk = (counterVal % 3) + 1; //each table has 3 entries keyed 1-3
VerifyRecord.isValidRead(record, PK_FIELD, expectedPk);
assertRecordOffset(record, true, counterVal == (expectedRecordsCount - 1));
assertRecordOffsetAndSnapshotSource(record, true, counterVal == (expectedRecordsCount - 1));
assertSourceInfo(record);
});
consumer.clear();
@ -196,12 +196,12 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 4);
assertRecordOffset(first, false, false);
assertRecordOffsetAndSnapshotSource(first, false, false);
assertSourceInfo(first, "test_database", "s1", "a");
second = consumer.remove();
VerifyRecord.isValidInsert(second, PK_FIELD, 4);
assertRecordOffset(second, false, false);
assertRecordOffsetAndSnapshotSource(second, false, false);
assertSourceInfo(second, "test_database", "s2", "a");
}
@ -235,11 +235,11 @@ public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
final SourceRecord first = consumer.remove();
VerifyRecord.isValidRead(first, PK_FIELD, 1);
assertRecordOffset(first, true, true);
assertRecordOffsetAndSnapshotSource(first, true, true);
System.out.println(first);
final SourceRecord second = consumer.remove();
assertThat(second.topic()).startsWith("__debezium-heartbeat");
assertRecordOffset(second, false, false);
assertRecordOffsetAndSnapshotSource(second, false, false);
// now shut down the producers and insert some more records
snapshotProducer.stop();
@ -286,7 +286,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffset(record, true, consumer.isEmpty());
assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty());
assertSourceInfo(record);
}
}
@ -327,7 +327,7 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffset(record, true, consumer.isEmpty());
assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty());
assertSourceInfo(record);
}
}

View File

@ -908,7 +908,7 @@ private void assertInsert(String statement, int pk, List<SchemaAndValueField> ex
try {
executeAndWait(statement);
SourceRecord record = assertRecordInserted(expectedTopicName, PK_FIELD, pk);
assertRecordOffset(record, false, false);
assertRecordOffsetAndSnapshotSource(record, false, false);
assertSourceInfo(record, "postgres", table.schema(), table.table());
assertRecordSchemaAndValues(expectedSchemaAndValuesByColumn, record, Envelope.FieldName.AFTER);
}