DBZ-751 Propagating DECIMAL column precision to Avro schemas

This commit is contained in:
Gunnar Morling 2018-06-22 12:16:29 +02:00 committed by Jiri Pechanec
parent d61db8930d
commit a292b05a96
10 changed files with 175 additions and 37 deletions

View File

@ -0,0 +1,103 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.MapAssert.entry;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
/**
* @author Gunnar Morling
*/
public class MySqlDecimalColumnIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-decimal-column.txt")
.toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("decimalcolumnit", "decimal_column_test")
.withDbHistoryPath(DB_HISTORY_PATH);
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
} finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
@FixFor("DBZ-751")
public void shouldSetPrecisionSchemaParameter() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
//Testing.Debug.enable();
int numCreateDatabase = 1;
int numCreateTables = 1;
int numInserts = 1;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numInserts);
stopConnector();
assertThat(records).isNotNull();
records.forEach(this::validate);
List<SourceRecord> dmls = records.recordsForTopic(DATABASE.topicForTable("dbz_751_decimal_column_test"));
assertThat(dmls).hasSize(1);
SourceRecord insert = dmls.get(0);
Map<String, String> rating1SchemaParameters = insert.valueSchema()
.field("before")
.schema()
.field("rating1")
.schema()
.parameters();
assertThat(rating1SchemaParameters).includes(
entry("scale", "0"), entry("connect.decimal.precision", "10"));
Map<String, String> rating2SchemaParameters = insert.valueSchema()
.field("before")
.schema()
.field("rating2")
.schema()
.parameters();
assertThat(rating2SchemaParameters).includes(
entry("scale", "4"), entry("connect.decimal.precision", "8"));
}
}

View File

@ -0,0 +1,12 @@
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: decimal_column_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE TABLE dbz_751_decimal_column_test (
id INT AUTO_INCREMENT NOT NULL,
rating1 DECIMAL,
rating2 DECIMAL(8,4),
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
INSERT INTO dbz_751_decimal_column_test VALUES (default, 123, 123.4567);

View File

@ -207,7 +207,7 @@ private SchemaBuilder numericSchema(Column column) {
if (decimalMode == DecimalMode.PRECISE && isVariableScaleDecimal(column)) {
return VariableScaleDecimal.builder();
}
return SpecialValueDecimal.builder(decimalMode, column.scale().get());
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().get());
}
@Override

View File

@ -182,16 +182,16 @@ protected List<SchemaAndValueField> schemasAndValuesForBigDecimalEncodedNumericT
final Struct nvs_int = new Struct(VariableScaleDecimal.schema());
nvs_int.put("scale", 0).put("value", new BigDecimal("22").unscaledValue().toByteArray());
final List<SchemaAndValueField> fields = new ArrayList<SchemaAndValueField>(Arrays.asList(
new SchemaAndValueField("d", Decimal.builder(2).optional().build(), new BigDecimal("1.10")),
new SchemaAndValueField("dzs", Decimal.builder(0).optional().build(), new BigDecimal("10")),
new SchemaAndValueField("d", Decimal.builder(2).parameter("connect.decimal.precision", "3").optional().build(), new BigDecimal("1.10")),
new SchemaAndValueField("dzs", Decimal.builder(0).parameter("connect.decimal.precision", "4").optional().build(), new BigDecimal("10")),
new SchemaAndValueField("dvs", VariableScaleDecimal.optionalSchema(), dvs),
new SchemaAndValueField("d_nn", Decimal.builder(2).build(), new BigDecimal("3.30")),
new SchemaAndValueField("n", Decimal.builder(4).optional().build(), new BigDecimal("22.2200")),
new SchemaAndValueField("nzs", Decimal.builder(0).optional().build(), new BigDecimal("22")),
new SchemaAndValueField("d_nn", Decimal.builder(2).parameter("connect.decimal.precision", "3").build(), new BigDecimal("3.30")),
new SchemaAndValueField("n", Decimal.builder(4).parameter("connect.decimal.precision", "6").optional().build(), new BigDecimal("22.2200")),
new SchemaAndValueField("nzs", Decimal.builder(0).parameter("connect.decimal.precision", "4").optional().build(), new BigDecimal("22")),
new SchemaAndValueField("nvs", VariableScaleDecimal.optionalSchema(), nvs),
new SchemaAndValueField("d_int", Decimal.builder(2).optional().build(), new BigDecimal("1.00")),
new SchemaAndValueField("d_int", Decimal.builder(2).parameter("connect.decimal.precision", "3").optional().build(), new BigDecimal("1.00")),
new SchemaAndValueField("dvs_int", VariableScaleDecimal.optionalSchema(), dvs_int),
new SchemaAndValueField("n_int", Decimal.builder(4).optional().build(), new BigDecimal("22.0000")),
new SchemaAndValueField("n_int", Decimal.builder(4).parameter("connect.decimal.precision", "6").optional().build(), new BigDecimal("22.0000")),
new SchemaAndValueField("nvs_int", VariableScaleDecimal.optionalSchema(), nvs_int)
));
return fields;
@ -376,7 +376,7 @@ protected List<SchemaAndValueField> schemasAndValuesForArrayTypes() {
(int)LocalDate.of(2016, Month.NOVEMBER, 5).toEpochDay(),
(int)LocalDate.of(2016, Month.NOVEMBER, 6).toEpochDay()
)),
new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(),
new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).parameter("connect.decimal.precision", "10").optional().build()).optional().build(),
Arrays.asList(
new BigDecimal("1.20"),
new BigDecimal("3.40"),
@ -397,7 +397,7 @@ protected List<SchemaAndValueField> schemasAndValuesForArrayTypesWithNullValues(
new SchemaAndValueField("char_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null),
new SchemaAndValueField("varchar_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null),
new SchemaAndValueField("date_array", SchemaBuilder.array(Date.builder().optional().schema()).optional().build(), null),
new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(), null),
new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).parameter("connect.decimal.precision", "10").optional().build()).optional().build(),null),
new SchemaAndValueField("citext_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null)
);
}
@ -637,7 +637,7 @@ private void assertSchema(Struct content) {
Schema schema = content.schema();
Field field = schema.field(fieldName);
assertNotNull(fieldName + " not found in schema " + schema, field);
VerifyRecord.assertConnectSchemasAreEqual(this.schema, field.schema());
VerifyRecord.assertConnectSchemasAreEqual(field.name(), this.schema, field.schema());
}
}

View File

@ -8,7 +8,6 @@
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SCHEMA_BLACKLIST;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -28,6 +27,7 @@
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.data.Xml;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
@ -80,11 +80,11 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception {
Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_FLOAT32_SCHEMA,
Schema.OPTIONAL_FLOAT64_SCHEMA, Schema.INT16_SCHEMA, Schema.INT64_SCHEMA, Schema.OPTIONAL_BOOLEAN_SCHEMA);
assertTableSchema("public.numeric_decimal_table", "d, dzs, dvs, n, nzs, nvs",
Decimal.builder(2).optional().build(),
Decimal.builder(0).optional().build(),
Decimal.builder(2).parameter("connect.decimal.precision", "3").optional().build(),
Decimal.builder(0).parameter("connect.decimal.precision", "4").optional().build(),
VariableScaleDecimal.builder().optional().build(),
Decimal.builder(4).optional().build(),
Decimal.builder(0).optional().build(),
Decimal.builder(4).parameter("connect.decimal.precision", "6").optional().build(),
Decimal.builder(0).parameter("connect.decimal.precision", "4").optional().build(),
VariableScaleDecimal.builder().optional().build()
);
assertTableSchema("public.string_table", "vc, vcv, ch, c, t",
@ -263,7 +263,7 @@ private void assertSchemaContent(String[] fields, Schema[] types, Schema keySche
String fieldName = fields[i].trim();
Field field = keySchema.field(Strings.unquoteIdentifierPart(fieldName));
assertNotNull(fieldName + " not found in schema", field);
assertEquals("'" + fieldName + "' has incorrect schema.", types[i], field.schema());
VerifyRecord.assertConnectSchemasAreEqual(fieldName, types[i], field.schema());
});
}

View File

@ -432,7 +432,7 @@ public void shouldReceiveChangesForTypeConstraints() throws Exception {
List<SchemaAndValueField> expectedBefore = Collections.singletonList(new SchemaAndValueField("num_val", null, null));
assertRecordSchemaAndValues(expectedBefore, updatedRecord, Envelope.FieldName.BEFORE);
List<SchemaAndValueField> expectedAfter = Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(2).optional().build(), new BigDecimal("123.45")));
List<SchemaAndValueField> expectedAfter = Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(2).parameter("connect.decimal.precision", "5").optional().build(), new BigDecimal("123.45")));
assertRecordSchemaAndValues(expectedAfter, updatedRecord, Envelope.FieldName.AFTER);
// change a constraint
@ -445,7 +445,7 @@ public void shouldReceiveChangesForTypeConstraints() throws Exception {
VerifyRecord.isValidInsert(updatedRecord, PK_FIELD, 2);
assertRecordSchemaAndValues(
Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(1).optional().build(), new BigDecimal("123.4"))), updatedRecord, Envelope.FieldName.AFTER);
Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(1).parameter("connect.decimal.precision", "6").optional().build(), new BigDecimal("123.4"))), updatedRecord, Envelope.FieldName.AFTER);
statements = "ALTER TABLE test_table ALTER COLUMN num_val TYPE NUMERIC;" +
"INSERT INTO test_table (pk,num_val) VALUES (3,123.4567);";
@ -469,7 +469,7 @@ public void shouldReceiveChangesForTypeConstraints() throws Exception {
VerifyRecord.isValidInsert(updatedRecord, PK_FIELD, 4);
assertRecordSchemaAndValues(
Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(4).optional().build(), new BigDecimal("2.4800"))), updatedRecord, Envelope.FieldName.AFTER);
Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(4).parameter("connect.decimal.precision", "12").optional().build(), new BigDecimal("2.4800"))), updatedRecord, Envelope.FieldName.AFTER);
statements = "ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL(12);" +
"INSERT INTO test_table (pk,num_val) VALUES (5,1238);";
@ -480,7 +480,7 @@ public void shouldReceiveChangesForTypeConstraints() throws Exception {
VerifyRecord.isValidInsert(updatedRecord, PK_FIELD, 5);
assertRecordSchemaAndValues(
Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(0).optional().build(), new BigDecimal("1238"))), updatedRecord, Envelope.FieldName.AFTER);
Collections.singletonList(new SchemaAndValueField("num_val", Decimal.builder(0).parameter("connect.decimal.precision", "12").optional().build(), new BigDecimal("1238"))), updatedRecord, Envelope.FieldName.AFTER);
statements = "ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL;" +
"INSERT INTO test_table (pk,num_val) VALUES (6,1225.1);";

View File

@ -27,6 +27,14 @@ public class SpecialValueDecimal implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Used as a schema parameter by the Avro serializer for creating a corresponding Avro schema with the correct
* precision.
*
* @see {@code AvroData#CONNECT_AVRO_DECIMAL_PRECISION_PROP}.
*/
private static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision";
/**
* Special values for floating-point and numeric types
*/
@ -135,17 +143,19 @@ else if (!decimalValue.equals(other.decimalValue))
* and documentation.
*
* @param mode the mode in which the number should be encoded
* @param precision the precision of the decimal
* @param scale scale of the decimal
* @return the schema builder
*/
public static SchemaBuilder builder(DecimalMode mode, int scale) {
public static SchemaBuilder builder(DecimalMode mode, int precision, int scale) {
switch (mode) {
case DOUBLE:
return SchemaBuilder.float64();
case PRECISE:
return Decimal.builder(scale);
case STRING:
return SchemaBuilder.string();
case DOUBLE:
return SchemaBuilder.float64();
case PRECISE:
return Decimal.builder(scale)
.parameter(PRECISION_PARAMETER_KEY, String.valueOf(precision));
case STRING:
return SchemaBuilder.string();
}
throw new IllegalArgumentException("Unknown decimalMode");
}

View File

@ -162,7 +162,7 @@ public SchemaBuilder schemaBuilder(Column column) {
return SchemaBuilder.float64();
case Types.NUMERIC:
case Types.DECIMAL:
return SpecialValueDecimal.builder(decimalMode, column.scale().get());
return SpecialValueDecimal.builder(decimalMode, column.length(), column.scale().get());
// Fixed-length string values
case Types.CHAR:

View File

@ -430,13 +430,26 @@ public static void assertEquals(SourceRecord actual, SourceRecord expected, Pred
assertEquals(actualValueSchema, actual.value(), expected.value(), "value", "", ignoreFields, comparatorsByName, comparatorsBySchemaName);
}
public static void assertConnectSchemasAreEqual(Schema schema1, Schema schema2) {
if (!areConnectSchemasEqual(schema1, schema2)) {
// failing with an assertion message that shows the difference
assertThat(SchemaUtil.asString(schema1)).isEqualTo(SchemaUtil.asString(schema2));
/**
* Asserts that the two given schemas are equal.
*
* @param fieldName
* name of the field owning that schema, if it's not a top-level schema
* @param actual
* the actual schema
* @param expected
* the expected schema
*/
public static void assertConnectSchemasAreEqual(String fieldName, Schema actual, Schema expected) {
if (!areConnectSchemasEqual(actual, expected)) {
// first try failing with an assertion message that shows the actual difference
assertThat(SchemaUtil.asString(actual)).describedAs("field name: " + fieldName).isEqualTo(SchemaUtil.asString(expected));
// fall-back just in case
fail(SchemaUtil.asString(schema1) + " was not equal to " + SchemaUtil.asString(schema2));
// compare schema parameters
assertThat(actual.parameters()).describedAs("field '" + fieldName + "' parameters").isEqualTo(expected.parameters());
// fall-back just in case (e.g. differences of element schemas of arrays)
fail("field '" + fieldName + "': " + SchemaUtil.asString(actual) + " was not equal to " + SchemaUtil.asString(expected));
}
}
@ -828,7 +841,7 @@ protected static void assertEquals(Object o1, Object o2) {
// assertThat(o1).isEqualTo(o2);
if (o1 instanceof Schema && o2 instanceof Schema) {
assertConnectSchemasAreEqual((Schema) o1, (Schema) o2);
assertConnectSchemasAreEqual(null, (Schema) o1, (Schema) o2);
}
else if (!equals(o1, o2)) {
fail(SchemaUtil.asString(o1) + " was not equal to " + SchemaUtil.asString(o2));

View File

@ -123,7 +123,7 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
assertThat(values.field("C1").schema()).isEqualTo(SchemaBuilder.string().build());
assertThat(values.field("C2").name()).isEqualTo("C2");
assertThat(values.field("C2").index()).isEqualTo(1);
assertThat(values.field("C2").schema()).isEqualTo(Decimal.builder(3).optional().build()); // scale of 3
assertThat(values.field("C2").schema()).isEqualTo(Decimal.builder(3).parameter("connect.decimal.precision", "5").optional().build()); // scale of 3
assertThat(values.field("C3").name()).isEqualTo("C3");
assertThat(values.field("C3").index()).isEqualTo(2);
assertThat(values.field("C3").schema()).isEqualTo(Date.builder().optional().build()); // optional date