DBZ-1069 Test of offsets
This commit is contained in:
parent
6b99569674
commit
85e20f2dde
@ -10,6 +10,7 @@
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
@ -256,6 +257,65 @@ public void streamChangesWhileStopped() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1069")
|
||||
public void verifyOffsets() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
final int TABLES = 2;
|
||||
final int ID_START = 10;
|
||||
final int ID_RESTART = 100;
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.build();
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a')"
|
||||
);
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b')"
|
||||
);
|
||||
}
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
List<SourceRecord> records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * TABLES).allRecordsInOrder();
|
||||
records = records.subList(1, records.size());
|
||||
for (Iterator<SourceRecord> it = records.iterator(); it.hasNext();) {
|
||||
SourceRecord record = it.next();
|
||||
assertThat(record.sourceOffset().get("snapshot")).as("Snapshot phase").isEqualTo(true);
|
||||
if (it.hasNext()) {
|
||||
assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot in progress").isEqualTo(false);
|
||||
}
|
||||
else {
|
||||
assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot completed").isEqualTo(true);
|
||||
}
|
||||
}
|
||||
|
||||
stopConnector();
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_RESTART + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a')"
|
||||
);
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b')"
|
||||
);
|
||||
}
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES).allRecordsInOrder();
|
||||
for (Iterator<SourceRecord> it = records.iterator(); it.hasNext();) {
|
||||
SourceRecord record = it.next();
|
||||
assertThat(record.sourceOffset().get("snapshot")).as("Streaming phase").isNull();
|
||||
assertThat(record.sourceOffset().get("snapshot_completed")).as("Streaming phase").isNull();
|
||||
assertThat(record.sourceOffset().get("change_lsn")).as("LSN present").isNotNull();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whitelistTable() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
|
Loading…
Reference in New Issue
Block a user