diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 5aff1e78d..1c0c1bfd7 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -1,6 +1,7 @@ Aaron Rosenberg Akshath Patkar Barry LaFond +Ben Williams Brandon Maguire Chris Riccomini Christian Posta diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java index 2b8574247..f7514269f 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java @@ -5,25 +5,6 @@ */ package io.debezium.connector.mysql; -import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; -import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary; -import com.mysql.jdbc.CharsetMapping; -import io.debezium.annotation.Immutable; -import io.debezium.data.Json; -import io.debezium.jdbc.JdbcValueConverters; -import io.debezium.relational.Column; -import io.debezium.relational.ValueConverter; -import io.debezium.time.Year; -import io.debezium.util.Strings; -import mil.nga.wkb.geom.Point; -import mil.nga.wkb.util.WkbException; -import org.apache.kafka.connect.data.Decimal; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.SourceRecord; - import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -40,6 +21,27 @@ import java.util.Arrays; import java.util.List; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; + +import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary; +import com.mysql.jdbc.CharsetMapping; + +import io.debezium.annotation.Immutable; +import io.debezium.data.Json; +import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; +import io.debezium.time.Year; +import io.debezium.util.Strings; +import mil.nga.wkb.geom.Point; +import mil.nga.wkb.util.WkbException; + /** * MySQL-specific customization of the conversions from JDBC values obtained from the MySQL binlog client library. *

@@ -177,12 +179,12 @@ public SchemaBuilder schemaBuilder(Column column) { } if (matches(typeName, "BIGINT UNSIGNED") || matches(typeName, "BIGINT UNSIGNED ZEROFILL")) { switch (super.bigIntUnsignedMode) { - case LONG: - return SchemaBuilder.int64(); - case PRECISE: - // In order to capture unsigned INT 64-bit data source, org.apache.kafka.connect.data.Decimal:Byte will be required to safely capture all valid values with scale of 0 - // Source: https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html - return Decimal.builder(0); + case LONG: + return SchemaBuilder.int64(); + case PRECISE: + // In order to capture unsigned INT 64-bit data source, org.apache.kafka.connect.data.Decimal:Byte will be required to safely capture all valid values with scale of 0 + // Source: https://kafka.apache.org/0102/javadoc/org/apache/kafka/connect/data/Schema.Type.html + return Decimal.builder(0); } } // Otherwise, let the base class handle it ... @@ -230,11 +232,11 @@ public ValueConverter converter(Column column, Field fieldDefn) { } if (matches(typeName, "BIGINT UNSIGNED") || matches(typeName, "BIGINT UNSIGNED ZEROFILL")) { switch (super.bigIntUnsignedMode) { - case LONG: - return (data) -> convertBigInt(column, fieldDefn, data); - case PRECISE: - // Convert BIGINT UNSIGNED internally from SIGNED to UNSIGNED based on the boundary settings - return (data) -> convertUnsignedBigint(column, fieldDefn, data); + case LONG: + return (data) -> convertBigInt(column, fieldDefn, data); + case PRECISE: + // Convert BIGINT UNSIGNED internally from SIGNED to UNSIGNED based on the boundary settings + return (data) -> convertUnsignedBigint(column, fieldDefn, data); } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlUnsignedIntegerIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlUnsignedIntegerIT.java index 1e531e8de..3e1012835 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlUnsignedIntegerIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlUnsignedIntegerIT.java @@ -11,7 +11,6 @@ import java.nio.file.Path; import java.sql.SQLException; -import io.debezium.doc.FixFor; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -21,6 +20,7 @@ import io.debezium.config.Configuration; import io.debezium.data.Envelope; +import io.debezium.doc.FixFor; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.util.Testing; @@ -397,23 +397,23 @@ private void assertBigintUnsignedLong(Struct value) { //So Signed BIGINT would be an INT64 type assertThat(after.schema().field("c3").schema()).isEqualTo(Schema.INT64_SCHEMA); - //Validate candidates values, note the loss in precision which is expected since BIGINT UNSIGNED cannot always be repsented by + //Validate candidates values, note the loss in precision which is expected since BIGINT UNSIGNED cannot always be represented by //a long datatype. switch (i) { - case 1: - assertThat(after.getInt64("c1")).isEqualTo(-1L); - assertThat(after.getInt64("c2")).isEqualTo(-1L); - assertThat(after.getInt64("c3")).isEqualTo(9223372036854775807L); - break; - case 2: - assertThat(after.getInt64("c1")).isEqualTo(-4000000000000000001L); - assertThat(after.getInt64("c2")).isEqualTo(-4000000000000000001L); - assertThat(after.getInt64("c3")).isEqualTo(-1223372036854775807L); - break; - case 3: - assertThat(after.getInt64("c1")).isEqualTo(0L); - assertThat(after.getInt64("c2")).isEqualTo(0L); - assertThat(after.getInt64("c3")).isEqualTo(-9223372036854775808L); + case 1: + assertThat(after.getInt64("c1")).isEqualTo(-1L); + assertThat(after.getInt64("c2")).isEqualTo(-1L); + assertThat(after.getInt64("c3")).isEqualTo(9223372036854775807L); + break; + case 2: + assertThat(after.getInt64("c1")).isEqualTo(-4000000000000000001L); + assertThat(after.getInt64("c2")).isEqualTo(-4000000000000000001L); + assertThat(after.getInt64("c3")).isEqualTo(-1223372036854775807L); + break; + case 3: + assertThat(after.getInt64("c1")).isEqualTo(0L); + assertThat(after.getInt64("c2")).isEqualTo(0L); + assertThat(after.getInt64("c3")).isEqualTo(-9223372036854775808L); } }