From 85e20f2dde572c44b14cb0b34eb2831c5e5906f3 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 10 Jan 2019 13:33:35 +0100 Subject: [PATCH] DBZ-1069 Test of offsets --- .../sqlserver/SqlServerConnectorIT.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index fb1effd2c..da7da82d7 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -10,6 +10,7 @@ import java.sql.SQLException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import org.apache.kafka.connect.data.Schema; @@ -256,6 +257,65 @@ public void streamChangesWhileStopped() throws Exception { } } + @Test + @FixFor("DBZ-1069") + public void verifyOffsets() throws Exception { + final int RECORDS_PER_TABLE = 5; + final int TABLES = 2; + final int ID_START = 10; + final int ID_RESTART = 100; + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) + .build(); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START + i; + connection.execute( + "INSERT INTO tablea VALUES(" + id + ", 'a')" + ); + connection.execute( + "INSERT INTO tableb VALUES(" + id + ", 'b')" + ); + } + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + List records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * TABLES).allRecordsInOrder(); + records = records.subList(1, records.size()); + for (Iterator it = records.iterator(); it.hasNext();) { + SourceRecord record = it.next(); + assertThat(record.sourceOffset().get("snapshot")).as("Snapshot phase").isEqualTo(true); + if (it.hasNext()) { + assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot in progress").isEqualTo(false); + } + else { + assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot completed").isEqualTo(true); + } + } + + stopConnector(); + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_RESTART + i; + connection.execute( + "INSERT INTO tablea VALUES(" + id + ", 'a')" + ); + connection.execute( + "INSERT INTO tableb VALUES(" + id + ", 'b')" + ); + } + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES).allRecordsInOrder(); + for (Iterator it = records.iterator(); it.hasNext();) { + SourceRecord record = it.next(); + assertThat(record.sourceOffset().get("snapshot")).as("Streaming phase").isNull(); + assertThat(record.sourceOffset().get("snapshot_completed")).as("Streaming phase").isNull(); + assertThat(record.sourceOffset().get("change_lsn")).as("LSN present").isNotNull(); + } + } + @Test public void whitelistTable() throws Exception { final int RECORDS_PER_TABLE = 5;