DBZ-1069 Store max LSN at start of snapshot

This commit is contained in:
Jiri Pechanec 2019-01-10 15:11:30 +01:00
parent 85e20f2dde
commit d3b4e481b1
3 changed files with 50 additions and 14 deletions

View File

@ -51,7 +51,8 @@ public SqlServerOffsetContext(String serverName, Lsn lsn, boolean snapshot, bool
if (sourceInfo.isSnapshot()) {
return Collect.hashMapOf(
SourceInfo.SNAPSHOT_KEY, true,
SNAPSHOT_COMPLETED_KEY, snapshotCompleted
SNAPSHOT_COMPLETED_KEY, snapshotCompleted,
SourceInfo.CHANGE_LSN_KEY, sourceInfo.getChangeLsn().toString()
);
}
else {

View File

@ -13,13 +13,13 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -34,7 +34,6 @@
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.time.Timestamp;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
/**
@ -117,14 +116,14 @@ private void takeSnapshot(SnapshotLockingMode lockingMode) throws Exception {
new SchemaAndValueField("price", Decimal.builder(2).parameter("connect.decimal.precision", "8").optional().build(), new BigDecimal(i + ".23")),
new SchemaAndValueField("ts", Timestamp.builder().optional().schema(), 1_531_920_536_000l)
);
final Map<String, ?> expectedSource1 = Collect.hashMapOf("snapshot", true, "snapshot_completed", i == INITIAL_RECORDS_PER_TABLE - 1);
final Struct key1 = (Struct)record1.key();
final Struct value1 = (Struct)record1.value();
assertRecord(key1, expectedKey1);
assertRecord((Struct)value1.get("after"), expectedRow1);
assertThat(record1.sourceOffset()).isEqualTo(expectedSource1);
assertThat(record1.sourceOffset()).includes(
MapAssert.entry("snapshot", true),
MapAssert.entry("snapshot_completed", i == INITIAL_RECORDS_PER_TABLE - 1));
assertNull(value1.get("before"));
}
}
@ -228,13 +227,14 @@ public void takeSnapshotFromTableWithReservedName() throws Exception {
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i),
new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, "name" + i)
);
final Map<String, ?> expectedSource1 = Collect.hashMapOf("snapshot", true, "snapshot_completed", i == INITIAL_RECORDS_PER_TABLE - 1);
final Struct key1 = (Struct)record1.key();
final Struct value1 = (Struct)record1.value();
assertRecord(key1, expectedKey1);
assertRecord((Struct)value1.get("after"), expectedRow1);
assertThat(record1.sourceOffset()).isEqualTo(expectedSource1);
assertThat(record1.sourceOffset()).includes(
MapAssert.entry("snapshot", true),
MapAssert.entry("snapshot_completed", i == INITIAL_RECORDS_PER_TABLE - 1));
assertNull(value1.get("before"));
}
}

View File

@ -278,6 +278,13 @@ public void verifyOffsets() throws Exception {
);
}
for (int i = 0; !connection.getMaxLsn().isAvailable(); i++) {
if (i == 30) {
org.junit.Assert.fail("Initial changes not writtent to CDC structures");
}
Testing.debug("Waiting for initial changes to be propagated to CDC structures");
Thread.sleep(1000);
}
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
@ -307,12 +314,40 @@ public void verifyOffsets() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES).allRecordsInOrder();
for (Iterator<SourceRecord> it = records.iterator(); it.hasNext();) {
SourceRecord record = it.next();
assertThat(record.sourceOffset().get("snapshot")).as("Streaming phase").isNull();
assertThat(record.sourceOffset().get("snapshot_completed")).as("Streaming phase").isNull();
assertThat(record.sourceOffset().get("change_lsn")).as("LSN present").isNotNull();
final SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List<SourceRecord> tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = i + ID_RESTART;
final SourceRecord recordA = tableA.get(i);
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowA = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, id),
new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, id),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct valueA = (Struct)recordA.value();
assertRecord((Struct)valueA.get("after"), expectedRowA);
assertNull(valueA.get("before"));
final Struct valueB = (Struct)recordB.value();
assertRecord((Struct)valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
assertThat(recordA.sourceOffset().get("snapshot")).as("Streaming phase").isNull();
assertThat(recordA.sourceOffset().get("snapshot_completed")).as("Streaming phase").isNull();
assertThat(recordA.sourceOffset().get("change_lsn")).as("LSN present").isNotNull();
assertThat(recordB.sourceOffset().get("snapshot")).as("Streaming phase").isNull();
assertThat(recordB.sourceOffset().get("snapshot_completed")).as("Streaming phase").isNull();
assertThat(recordB.sourceOffset().get("change_lsn")).as("LSN present").isNotNull();
}
}