DBZ-4125 Asserting database name in change events;

Using assertThat() statically imported.
This commit is contained in:
Gunnar Morling 2021-10-19 13:03:12 +02:00
parent e5ceddca2f
commit adba67a2c5

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.sqlserver;
import static org.fest.assertions.Assertions.assertThat;
import java.sql.SQLException;
import java.util.List;
@ -12,7 +14,6 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Test;
@ -65,11 +66,11 @@ public void shouldParseWhitespaceChars() throws Exception {
assertConnectorIsRunning();
SourceRecords actualRecords = consumeRecordsByTopic(2, false);
Assertions.assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
Assertions.assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Prop")).hasSize(1);
assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Prop")).hasSize(1);
List<SourceRecord> carRecords = actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset");
Assertions.assertThat(carRecords.size()).isEqualTo(1);
assertThat(carRecords.size()).isEqualTo(1);
SourceRecord carRecord = carRecords.get(0);
assertSchemaMatchesStruct(
@ -86,10 +87,10 @@ public void shouldParseWhitespaceChars() throws Exception {
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key")
.field("id", Schema.INT32_SCHEMA)
.build());
Assertions.assertThat(((Struct) carRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("asset");
assertThat(((Struct) carRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("asset");
List<SourceRecord> personRecords = actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Prop");
Assertions.assertThat(personRecords.size()).isEqualTo(1);
assertThat(personRecords.size()).isEqualTo(1);
SourceRecord personRecord = personRecords.get(0);
assertSchemaMatchesStruct(
@ -106,7 +107,7 @@ public void shouldParseWhitespaceChars() throws Exception {
.name("server1.dbo.UAT_WAG_CZ_Fixed_Prop.Key")
.field("id", Schema.INT32_SCHEMA)
.build());
Assertions.assertThat(((Struct) personRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("prop");
assertThat(((Struct) personRecord.value()).getStruct("after").getString("my col$a")).isEqualTo("prop");
}
@Test
@ -132,7 +133,7 @@ public void shouldParseSpecialChars() throws Exception {
assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
SourceRecord record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
assertSchemaMatchesStruct(
@ -149,11 +150,11 @@ public void shouldParseSpecialChars() throws Exception {
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key")
.field("id", Schema.INT32_SCHEMA)
.build());
Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1);
assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(1);
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(2, 'b')");
records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
assertSchemaMatchesStruct(
(Struct) ((Struct) record.value()).get("after"),
@ -169,14 +170,14 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key")
.field("id", Schema.INT32_SCHEMA)
.build());
Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(2);
assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(2);
connection.execute(
"CREATE TABLE [UAT WAG CZ$Fixed Asset Two] (id int primary key, [my col$] varchar(30), Description varchar(30) NOT NULL)");
TestHelper.enableTableCdc(connection, "UAT WAG CZ$Fixed Asset Two");
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset Two] VALUES(3, 'b', 'empty')");
records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1);
assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1);
record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0);
assertSchemaMatchesStruct(
(Struct) ((Struct) record.value()).get("after"),
@ -193,11 +194,11 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two.Key")
.field("id", Schema.INT32_SCHEMA)
.build());
Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(3);
assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(3);
connection.execute("UPDATE [UAT WAG CZ$Fixed Asset Two] SET Description='c1' WHERE id=3");
records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1);
assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two")).hasSize(1);
record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0);
assertSchemaMatchesStruct(
(Struct) ((Struct) record.value()).get("after"),
@ -217,8 +218,8 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0
.field("my_col_", Schema.OPTIONAL_STRING_SCHEMA)
.field("Description", Schema.STRING_SCHEMA)
.build());
Assertions.assertThat(((Struct) record.value()).getStruct("after").getString("Description")).isEqualTo("c1");
Assertions.assertThat(((Struct) record.value()).getStruct("before").getString("Description")).isEqualTo("empty");
assertThat(((Struct) record.value()).getStruct("after").getString("Description")).isEqualTo("c1");
assertThat(((Struct) record.value()).getStruct("before").getString("Description")).isEqualTo("empty");
stopConnector();
@ -227,7 +228,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0
connection.execute("INSERT INTO [UAT WAG CZ$Fixed Asset] VALUES(4, 'b')");
records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
assertThat(records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
assertSchemaMatchesStruct(
(Struct) ((Struct) record.value()).get("after"),
@ -243,7 +244,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Key")
.field("id", Schema.INT32_SCHEMA)
.build());
Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(4);
assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(4);
}
@Test
@ -269,7 +270,11 @@ public void shouldHandleSpecialCharactersInDatabaseNames() throws Exception {
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
SourceRecords recordsByTopic = consumeRecordsByTopic(1);
List<SourceRecord> records = recordsByTopic.recordsForTopic("server1.dbo.tablea");
assertThat(records).hasSize(1);
Struct source = (Struct) ((Struct) records.get(0).value()).get("source");
assertThat(source.get("db")).isEqualTo("test-db");
TestHelper.waitForMaxLsnAvailable(connection, databaseName);
}