diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java index ed160a85e..5502c4dba 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SpecialCharsInNamesIT.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.sqlserver; +import static org.fest.assertions.Assertions.assertThat; + import java.sql.SQLException; import java.util.List; @@ -12,7 +14,6 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; -import org.fest.assertions.Assertions; import org.junit.After; import org.junit.Test; @@ -65,11 +66,11 @@ public void shouldParseWhitespaceChars() throws Exception { assertConnectorIsRunning(); SourceRecords actualRecords = consumeRecordsByTopic(2, false); - Assertions.assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); - Assertions.assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Prop")).hasSize(1); + assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); + assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Prop")).hasSize(1); List carRecords = actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset"); - Assertions.assertThat(carRecords.size()).isEqualTo(1); + assertThat(carRecords.size()).isEqualTo(1); SourceRecord carRecord = carRecords.get(0); assertSchemaMatchesStruct( @@ -86,10 +87,10 @@ public void shouldParseWhitespaceChars() throws Exception { .name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key") .field("id", Schema.INT32_SCHEMA) .build()); - Assertions.assertThat(((Struct) carRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("asset"); + assertThat(((Struct) carRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("asset"); List personRecords = actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Prop"); - Assertions.assertThat(personRecords.size()).isEqualTo(1); + assertThat(personRecords.size()).isEqualTo(1); SourceRecord personRecord = personRecords.get(0); assertSchemaMatchesStruct( @@ -106,7 +107,7 @@ public void shouldParseWhitespaceChars() throws Exception { .name("server1.dbo.UAT_WAG_CZ_Fixed_Prop.Key") .field("id", Schema.INT32_SCHEMA) .build()); - Assertions.assertThat(((Struct) personRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("prop"); + assertThat(((Struct) personRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("prop"); } @Test @@ -132,7 +133,7 @@ public void shouldParseSpecialChars() throws Exception { assertConnectorIsRunning(); SourceRecords records = consumeRecordsByTopic(1); - Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); + assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); SourceRecord record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); assertSchemaMatchesStruct( @@ -149,11 +150,11 @@ public void shouldParseSpecialChars() throws Exception { .name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key") .field("id", Schema.INT32_SCHEMA) .build()); - Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1); + assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1); connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b')"); records = consumeRecordsByTopic(1); - Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); + assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); assertSchemaMatchesStruct( (Struct) ((Struct) record.value()).get("after"), @@ -169,14 +170,14 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); .name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key") .field("id", Schema.INT32_SCHEMA) .build()); - Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(2); + assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(2); connection.execute( "CREATE TABLE [UAT WAG CZ$Fixed Asset Two] (id int primary key, [my col$] varchar(30), Description varchar(30) NOT NULL)"); TestHelper.enableTableCdc(connection, "UAT WAG CZ$Fixed Asset Two"); connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset Two] VALUES(3, 'b', 'empty')"); records = consumeRecordsByTopic(1); - Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1); + assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1); record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0); assertSchemaMatchesStruct( (Struct) ((Struct) record.value()).get("after"), @@ -193,11 +194,11 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0 .name("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two.Key") .field("id", Schema.INT32_SCHEMA) .build()); - Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(3); + assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(3); connection.execute("UPDATE [UAT WAG CZ$Fixed Asset Two] SET Description='c1' WHERE id=3"); records = consumeRecordsByTopic(1); - Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1); + assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1); record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0); assertSchemaMatchesStruct( (Struct) ((Struct) record.value()).get("after"), @@ -217,8 +218,8 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0 .field("my_col_", Schema.OPTIONAL_STRING_SCHEMA) .field("Description", Schema.STRING_SCHEMA) .build()); - Assertions.assertThat(((Struct) record.value()).getStruct("after").getString("Description")).isEqualTo("c1"); - Assertions.assertThat(((Struct) record.value()).getStruct("before").getString("Description")).isEqualTo("empty"); + assertThat(((Struct) record.value()).getStruct("after").getString("Description")).isEqualTo("c1"); + assertThat(((Struct) record.value()).getStruct("before").getString("Description")).isEqualTo("empty"); stopConnector(); @@ -227,7 +228,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0 connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b')"); records = consumeRecordsByTopic(1); - Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); + assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1); record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); assertSchemaMatchesStruct( (Struct) ((Struct) record.value()).get("after"), @@ -243,7 +244,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0); .name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key") .field("id", Schema.INT32_SCHEMA) .build()); - Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(4); + assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(4); } @Test @@ -269,7 +270,11 @@ public void shouldHandleSpecialCharactersInDatabaseNames() throws Exception { assertConnectorIsRunning(); // Wait for snapshot completion - consumeRecordsByTopic(1); + SourceRecords recordsByTopic = consumeRecordsByTopic(1); + List records = recordsByTopic.recordsForTopic("server1.dbo.tablea"); + assertThat(records).hasSize(1); + Struct source = (Struct) ((Struct) records.get(0).value()).get("source"); + assertThat(source.get("db")).isEqualTo("test-db"); TestHelper.waitForMaxLsnAvailable(connection, databaseName); }