diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java index dc3fbae5d..df719ad6b 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java @@ -9,6 +9,8 @@ import java.sql.SQLException; import java.util.List; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.fest.assertions.Assertions; @@ -88,6 +90,7 @@ public void decimalModeConfigString() throws Exception { final List tableA = records.recordsForTopic("server1.dbo.tablenuma"); Assertions.assertThat(tableA).hasSize(1); final Struct valueA = (Struct) tableA.get(0).value(); + assertSchema(valueA, Schema.OPTIONAL_STRING_SCHEMA); Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo("111.1111"); Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo("1111111"); Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo("1111111.1"); @@ -118,6 +121,7 @@ public void decimalModeConfigDouble() throws Exception { final List results = records.recordsForTopic("server1.dbo.tablenumb"); Assertions.assertThat(results).hasSize(1); final Struct valueA = (Struct) results.get(0).value(); + assertSchema(valueA, Schema.OPTIONAL_FLOAT64_SCHEMA); Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(222.2222d); Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(22222d); Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(22222.2d); @@ -147,10 +151,21 @@ public void decimalModeConfigPrecise() throws Exception { final List results = records.recordsForTopic("server1.dbo.tablenumc"); Assertions.assertThat(results).hasSize(1); final Struct valueA = (Struct) results.get(0).value(); + Assertions.assertThat(valueA.schema().field("after").schema().field("cola").schema()).isEqualTo(Decimal.builder(4).parameter("connect.decimal.precision", "8").optional().schema()); + Assertions.assertThat(valueA.schema().field("after").schema().field("colb").schema()).isEqualTo(Decimal.builder(0).parameter("connect.decimal.precision", "18").optional().schema()); + Assertions.assertThat(valueA.schema().field("after").schema().field("colc").schema()).isEqualTo(Decimal.builder(1).parameter("connect.decimal.precision", "8").optional().schema()); + Assertions.assertThat(valueA.schema().field("after").schema().field("cold").schema()).isEqualTo(Decimal.builder(0).parameter("connect.decimal.precision", "18").optional().schema()); Assertions.assertThat(((Struct) valueA.get("after")).get("cola")).isEqualTo(BigDecimal.valueOf(333.3333)); Assertions.assertThat(((Struct) valueA.get("after")).get("colb")).isEqualTo(BigDecimal.valueOf(3333)); Assertions.assertThat(((Struct) valueA.get("after")).get("colc")).isEqualTo(BigDecimal.valueOf(3333.3)); Assertions.assertThat(((Struct) valueA.get("after")).get("cold")).isEqualTo(BigDecimal.valueOf(33333333)); stopConnector(); } + + private void assertSchema(Struct valueA, Schema expected) { + Assertions.assertThat(valueA.schema().field("after").schema().field("cola").schema()).isEqualTo(expected); + Assertions.assertThat(valueA.schema().field("after").schema().field("colb").schema()).isEqualTo(expected); + Assertions.assertThat(valueA.schema().field("after").schema().field("colc").schema()).isEqualTo(expected); + Assertions.assertThat(valueA.schema().field("after").schema().field("cold").schema()).isEqualTo(expected); + } }