DBZ-1031 Some clean-up
This commit is contained in:
parent
2ec234d386
commit
ef77fb3c42
@ -6,6 +6,7 @@
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import static io.debezium.connector.sqlserver.SqlServerConnectorConfig.SNAPSHOT_LOCKING_MODE;
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
@ -14,12 +15,10 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.debezium.doc.FixFor;
|
||||
import org.apache.kafka.connect.data.Decimal;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -30,6 +29,7 @@
|
||||
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
|
||||
import io.debezium.connector.sqlserver.util.TestHelper;
|
||||
import io.debezium.data.SchemaAndValueField;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.time.Timestamp;
|
||||
import io.debezium.util.Collect;
|
||||
@ -41,6 +41,7 @@
|
||||
* @author Jiri Pechanec
|
||||
*/
|
||||
public class SnapshotIT extends AbstractConnectorTest {
|
||||
|
||||
private static final int INITIAL_RECORDS_PER_TABLE = 500;
|
||||
private static final int STREAMING_RECORDS_PER_TABLE = 500;
|
||||
|
||||
@ -101,7 +102,7 @@ private void takeSnapshot(SnapshotLockingMode lockingMode) throws Exception {
|
||||
final SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE);
|
||||
final List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1");
|
||||
|
||||
Assertions.assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE);
|
||||
assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE);
|
||||
|
||||
for (int i = 0; i < INITIAL_RECORDS_PER_TABLE; i++) {
|
||||
final SourceRecord record1 = table1.get(i);
|
||||
@ -121,7 +122,7 @@ private void takeSnapshot(SnapshotLockingMode lockingMode) throws Exception {
|
||||
final Struct value1 = (Struct)record1.value();
|
||||
assertRecord(key1, expectedKey1);
|
||||
assertRecord((Struct)value1.get("after"), expectedRow1);
|
||||
Assertions.assertThat(record1.sourceOffset()).isEqualTo(expectedSource1);
|
||||
assertThat(record1.sourceOffset()).isEqualTo(expectedSource1);
|
||||
assertNull(value1.get("before"));
|
||||
}
|
||||
}
|
||||
@ -150,7 +151,7 @@ private void testStreaming() throws SQLException, InterruptedException {
|
||||
final SourceRecords records = consumeRecordsByTopic(STREAMING_RECORDS_PER_TABLE);
|
||||
final List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1");
|
||||
|
||||
Assertions.assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE);
|
||||
assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE);
|
||||
|
||||
for (int i = 0; i < INITIAL_RECORDS_PER_TABLE; i++) {
|
||||
final int id = i + INITIAL_RECORDS_PER_TABLE;
|
||||
@ -169,7 +170,7 @@ private void testStreaming() throws SQLException, InterruptedException {
|
||||
final Struct value1 = (Struct)record1.value();
|
||||
assertRecord(key1, expectedKey1);
|
||||
assertRecord((Struct)value1.get("after"), expectedRow1);
|
||||
Assertions.assertThat(record1.sourceOffset()).hasSize(1);
|
||||
assertThat(record1.sourceOffset()).hasSize(1);
|
||||
|
||||
Assert.assertTrue(record1.sourceOffset().containsKey("change_lsn"));
|
||||
assertNull(value1.get("before"));
|
||||
@ -190,7 +191,7 @@ public void takeSchemaOnlySnapshotAndStartStreaming() throws Exception {
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1031")
|
||||
public void takeSnapshotFromTableReservedName() throws Exception {
|
||||
public void takeSnapshotFromTableWithReservedName() throws Exception {
|
||||
connection.execute(
|
||||
"CREATE TABLE [User] (id int, name varchar(30), primary key(id))"
|
||||
);
|
||||
@ -214,7 +215,7 @@ public void takeSnapshotFromTableReservedName() throws Exception {
|
||||
final SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE);
|
||||
final List<SourceRecord> user = records.recordsForTopic("server1.dbo.User");
|
||||
|
||||
Assertions.assertThat(user).hasSize(INITIAL_RECORDS_PER_TABLE);
|
||||
assertThat(user).hasSize(INITIAL_RECORDS_PER_TABLE);
|
||||
|
||||
for (int i = 0; i < INITIAL_RECORDS_PER_TABLE; i++) {
|
||||
final SourceRecord record1 = user.get(i);
|
||||
@ -227,12 +228,11 @@ public void takeSnapshotFromTableReservedName() throws Exception {
|
||||
);
|
||||
final Map<String, ?> expectedSource1 = Collect.hashMapOf("snapshot", true, "snapshot_completed", i == INITIAL_RECORDS_PER_TABLE - 1);
|
||||
|
||||
|
||||
final Struct key1 = (Struct)record1.key();
|
||||
final Struct value1 = (Struct)record1.value();
|
||||
assertRecord(key1, expectedKey1);
|
||||
assertRecord((Struct)value1.get("after"), expectedRow1);
|
||||
Assertions.assertThat(record1.sourceOffset()).isEqualTo(expectedSource1);
|
||||
assertThat(record1.sourceOffset()).isEqualTo(expectedSource1);
|
||||
assertNull(value1.get("before"));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user