DBZ-20 Oracle precision supports negative numbers

This commit is contained in:
Gunnar Morling 2018-06-13 10:41:11 +02:00 committed by Jiri Pechanec
parent f4e3668ebf
commit dce1aa29f1
15 changed files with 82 additions and 53 deletions

View File

@ -6,11 +6,13 @@
package io.debezium.connector.mysql;
import java.sql.SQLException;
import java.sql.Types;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
@ -57,7 +59,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(person.columnWithName("name").typeName()).isEqualTo("VARCHAR");
assertThat(person.columnWithName("name").jdbcType()).isEqualTo(Types.VARCHAR);
assertThat(person.columnWithName("name").length()).isEqualTo(255);
assertThat(person.columnWithName("name").scale()).isEqualTo(0);
assertFalse(person.columnWithName("name").scale().isPresent());
assertThat(person.columnWithName("name").position()).isEqualTo(1);
assertThat(person.columnWithName("name").isAutoIncremented()).isFalse();
assertThat(person.columnWithName("name").isGenerated()).isFalse();
@ -66,7 +68,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(person.columnWithName("birthdate").typeName()).isEqualTo("DATE");
assertThat(person.columnWithName("birthdate").jdbcType()).isEqualTo(Types.DATE);
assertThat(person.columnWithName("birthdate").length()).isEqualTo(10);
assertThat(person.columnWithName("birthdate").scale()).isEqualTo(0);
assertFalse(person.columnWithName("birthdate").scale().isPresent());
assertThat(person.columnWithName("birthdate").position()).isEqualTo(2);
assertThat(person.columnWithName("birthdate").isAutoIncremented()).isFalse();
assertThat(person.columnWithName("birthdate").isGenerated()).isFalse();
@ -75,7 +77,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(person.columnWithName("age").typeName()).isEqualTo("INT");
assertThat(person.columnWithName("age").jdbcType()).isEqualTo(Types.INTEGER);
assertThat(person.columnWithName("age").length()).isEqualTo(10);
assertThat(person.columnWithName("age").scale()).isEqualTo(0);
assertThat(person.columnWithName("age").scale().get()).isEqualTo(0);
assertThat(person.columnWithName("age").position()).isEqualTo(3);
assertThat(person.columnWithName("age").isAutoIncremented()).isFalse();
assertThat(person.columnWithName("age").isGenerated()).isFalse();
@ -84,7 +86,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(person.columnWithName("salary").typeName()).isEqualTo("DECIMAL");
assertThat(person.columnWithName("salary").jdbcType()).isEqualTo(Types.DECIMAL);
assertThat(person.columnWithName("salary").length()).isEqualTo(5);
assertThat(person.columnWithName("salary").scale()).isEqualTo(2);
assertThat(person.columnWithName("salary").scale().get()).isEqualTo(2);
assertThat(person.columnWithName("salary").position()).isEqualTo(4);
assertThat(person.columnWithName("salary").isAutoIncremented()).isFalse();
assertThat(person.columnWithName("salary").isGenerated()).isFalse();
@ -93,7 +95,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(person.columnWithName("bitStr").typeName()).isEqualTo("BIT");
assertThat(person.columnWithName("bitStr").jdbcType()).isEqualTo(Types.BIT);
assertThat(person.columnWithName("bitStr").length()).isEqualTo(18);
assertThat(person.columnWithName("bitStr").scale()).isEqualTo(0);
assertFalse(person.columnWithName("bitStr").scale().isPresent());
assertThat(person.columnWithName("bitStr").position()).isEqualTo(5);
assertThat(person.columnWithName("bitStr").isAutoIncremented()).isFalse();
assertThat(person.columnWithName("bitStr").isGenerated()).isFalse();
@ -119,7 +121,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(product.columnWithName("id").typeName()).isEqualTo("INT");
assertThat(product.columnWithName("id").jdbcType()).isEqualTo(Types.INTEGER);
assertThat(product.columnWithName("id").length()).isEqualTo(10);
assertThat(product.columnWithName("id").scale()).isEqualTo(0);
assertThat(product.columnWithName("id").scale().get()).isEqualTo(0);
assertThat(product.columnWithName("id").position()).isEqualTo(1);
assertThat(product.columnWithName("id").isAutoIncremented()).isTrue();
assertThat(product.columnWithName("id").isGenerated()).isFalse();
@ -128,7 +130,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(product.columnWithName("createdByDate").typeName()).isEqualTo("DATETIME");
assertThat(product.columnWithName("createdByDate").jdbcType()).isEqualTo(Types.TIMESTAMP);
assertThat(product.columnWithName("createdByDate").length()).isEqualTo(19);
assertThat(product.columnWithName("createdByDate").scale()).isEqualTo(0);
assertFalse(product.columnWithName("createdByDate").scale().isPresent());
assertThat(product.columnWithName("createdByDate").position()).isEqualTo(2);
assertThat(product.columnWithName("createdByDate").isAutoIncremented()).isFalse();
assertThat(product.columnWithName("createdByDate").isGenerated()).isFalse();
@ -137,7 +139,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(product.columnWithName("modifiedDate").typeName()).isEqualTo("DATETIME");
assertThat(product.columnWithName("modifiedDate").jdbcType()).isEqualTo(Types.TIMESTAMP);
assertThat(product.columnWithName("modifiedDate").length()).isEqualTo(19);
assertThat(product.columnWithName("modifiedDate").scale()).isEqualTo(0);
assertFalse(product.columnWithName("modifiedDate").scale().isPresent());
assertThat(product.columnWithName("modifiedDate").position()).isEqualTo(3);
assertThat(product.columnWithName("modifiedDate").isAutoIncremented()).isFalse();
assertThat(product.columnWithName("modifiedDate").isGenerated()).isFalse();
@ -163,7 +165,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(purchased.columnWithName("purchaser").typeName()).isEqualTo("VARCHAR");
assertThat(purchased.columnWithName("purchaser").jdbcType()).isEqualTo(Types.VARCHAR);
assertThat(purchased.columnWithName("purchaser").length()).isEqualTo(255);
assertThat(purchased.columnWithName("purchaser").scale()).isEqualTo(0);
assertFalse(purchased.columnWithName("purchaser").scale().isPresent());
assertThat(purchased.columnWithName("purchaser").position()).isEqualTo(1);
assertThat(purchased.columnWithName("purchaser").isAutoIncremented()).isFalse();
assertThat(purchased.columnWithName("purchaser").isGenerated()).isFalse();
@ -172,7 +174,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(purchased.columnWithName("productId").typeName()).isEqualTo("INT");
assertThat(purchased.columnWithName("productId").jdbcType()).isEqualTo(Types.INTEGER);
assertThat(purchased.columnWithName("productId").length()).isEqualTo(10);
assertThat(purchased.columnWithName("productId").scale()).isEqualTo(0);
assertThat(purchased.columnWithName("productId").scale().isPresent());
assertThat(purchased.columnWithName("productId").position()).isEqualTo(2);
assertThat(purchased.columnWithName("productId").isAutoIncremented()).isFalse();
assertThat(purchased.columnWithName("productId").isGenerated()).isFalse();
@ -181,7 +183,7 @@ public void shouldLoadMetadataViaJdbc() throws SQLException {
assertThat(purchased.columnWithName("purchaseDate").typeName()).isEqualTo("DATETIME");
assertThat(purchased.columnWithName("purchaseDate").jdbcType()).isEqualTo(Types.TIMESTAMP);
assertThat(purchased.columnWithName("purchaseDate").length()).isEqualTo(19);
assertThat(purchased.columnWithName("purchaseDate").scale()).isEqualTo(0);
assertFalse(purchased.columnWithName("purchaseDate").scale().isPresent());
assertThat(purchased.columnWithName("purchaseDate").position()).isEqualTo(3);
assertThat(purchased.columnWithName("purchaseDate").isAutoIncremented()).isFalse();
assertThat(purchased.columnWithName("purchaseDate").isGenerated()).isFalse();

View File

@ -6,6 +6,7 @@
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
@ -1619,7 +1620,7 @@ protected void assertColumn(Table table, String name, String typeName, int jdbcT
assertThat(column.jdbcType()).isEqualTo(jdbcType);
assertThat(column.length()).isEqualTo(length);
assertThat(column.charsetName()).isEqualTo(charsetName);
assertThat(column.scale()).isEqualTo(-1);
assertFalse(column.scale().isPresent());
assertThat(column.isOptional()).isEqualTo(optional);
assertThat(column.isGenerated()).isFalse();
assertThat(column.isAutoIncremented()).isFalse();
@ -1632,7 +1633,12 @@ protected void assertColumn(Table table, String name, String typeName, int jdbcT
assertThat(column.typeName()).isEqualTo(typeName);
assertThat(column.jdbcType()).isEqualTo(jdbcType);
assertThat(column.length()).isEqualTo(length);
assertThat(column.scale()).isEqualTo(scale);
if (scale == Column.UNSET_INT_VALUE) {
assertFalse(column.scale().isPresent());
}
else {
assertThat(column.scale().get()).isEqualTo(scale);
}
assertThat(column.isOptional()).isEqualTo(optional);
assertThat(column.isGenerated()).isEqualTo(generated);
assertThat(column.isAutoIncremented()).isEqualTo(autoIncremented);

View File

@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -107,8 +108,8 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
if (column.jdbcType() == Types.TIMESTAMP) {
editor.addColumn(
column.edit()
.length(column.scale())
.scale(-1)
.length(column.scale().orElse(Column.UNSET_INT_VALUE))
.scale(Optional.empty())
.create()
);
}

View File

@ -61,7 +61,7 @@ public void shouldParseCreateTable() {
assertThat(score.jdbcType()).isEqualTo(Types.NUMERIC);
assertThat(score.typeName()).isEqualTo("NUMBER");
assertThat(score.length()).isEqualTo(6);
assertThat(score.scale()).isEqualTo(2);
assertThat(score.scale().get()).isEqualTo(2);
assertThat(table.columns()).hasSize(4);
assertThat(table.isPrimaryKeyColumn("ID"));

View File

@ -131,7 +131,7 @@ public SchemaBuilder schemaBuilder(Column column) {
case PgOid.POINT:
return Point.builder();
case PgOid.MONEY:
return Decimal.builder(column.scale());
return Decimal.builder(column.scale().get());
case PgOid.NUMERIC:
return numericSchema(column);
case PgOid.BYTEA:
@ -207,7 +207,7 @@ private SchemaBuilder numericSchema(Column column) {
if (decimalMode == DecimalMode.PRECISE && isVariableScaleDecimal(column)) {
return VariableScaleDecimal.builder();
}
return SpecialValueDecimal.builder(decimalMode, column.scale());
return SpecialValueDecimal.builder(decimalMode, column.scale().get());
}
@Override
@ -341,8 +341,8 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data, Dec
}
newDecimal = value.getDecimalValue().get();
if (column.scale() > newDecimal.scale()) {
newDecimal = newDecimal.setScale(column.scale());
if (column.scale().get() > newDecimal.scale()) {
newDecimal = newDecimal.setScale(column.scale().get());
}
if (isVariableScaleDecimal(column) && mode == DecimalMode.PRECISE) {
@ -610,8 +610,8 @@ protected Object convertArray(Column column, Field fieldDefn, ValueConverter ele
}
private boolean isVariableScaleDecimal(final Column column) {
return (column.scale() == 0 && column.length() == VARIABLE_SCALE_DECIMAL_LENGTH)
|| (column.scale() == -1 && column.length() == -1);
return (column.scale().isPresent() && column.scale().get() == 0 && column.length() == VARIABLE_SCALE_DECIMAL_LENGTH)
|| (!column.scale().isPresent() && column.length() == -1);
}
public static Optional<SpecialValueDecimal> toSpecialValue(String value) {
@ -643,6 +643,6 @@ protected Object convertTimestampToUTC(Column column, Field fieldDefn, Object da
@Override
protected int getTimePrecision(Column column) {
return column.scale();
return column.scale().orElse(-1);
}
}

View File

@ -465,7 +465,7 @@ private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table tab
incomingLength);
return true;
}
final int localScale = column.scale();
final int localScale = column.scale().get();
final int incomingScale = message.getTypeMetadata().getScale();
if (localScale != incomingScale) {
logger.info("detected new scale for column '{}', old scale was {}, new scale is {}; refreshing table schema", columnName, localScale,

View File

@ -840,7 +840,9 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
column.jdbcType(rs.getInt(5));
column.type(rs.getString(6));
column.length(rs.getInt(7));
column.scale(rs.getInt(9));
if (rs.getObject(9) != null) {
column.scale(rs.getInt(9));
}
column.optional(isNullable(rs.getInt(11)));
column.position(rs.getInt(17));
column.autoIncremented("YES".equalsIgnoreCase(rs.getString(23)));

View File

@ -162,7 +162,7 @@ public SchemaBuilder schemaBuilder(Column column) {
return SchemaBuilder.float64();
case Types.NUMERIC:
case Types.DECIMAL:
return SpecialValueDecimal.builder(decimalMode, column.scale());
return SpecialValueDecimal.builder(decimalMode, column.scale().get());
// Fixed-length string values
case Types.CHAR:

View File

@ -6,6 +6,7 @@
package io.debezium.relational;
import java.sql.Types;
import java.util.Optional;
import io.debezium.annotation.Immutable;
@ -90,9 +91,9 @@ public static ColumnEditor editor() {
/**
* Get the scale of the column.
*
* @return the scale, or -1 if the scale does not apply to this type
* @return the scale if it applies to this type
*/
int scale();
Optional<Integer> scale();
/**
* Determine whether this column is optional.

View File

@ -6,6 +6,7 @@
package io.debezium.relational;
import java.sql.Types;
import java.util.Optional;
import io.debezium.annotation.NotThreadSafe;
@ -87,9 +88,9 @@ public interface ColumnEditor extends Comparable<Column> {
/**
* Get the scale of the column.
*
* @return the scale, or -1 if the scale does not apply to this type
* @return the scale if present
*/
int scale();
Optional<Integer> scale();
/**
* Determine whether this column is optional.
@ -195,11 +196,19 @@ public interface ColumnEditor extends Comparable<Column> {
/**
* Set the scale of the column.
*
* @param scale the scale, or -1 if the scale does not apply to this type
* @param scale the scale
* @return this editor so callers can chain methods together
*/
ColumnEditor scale(int scale);
/**
* Set the scale of the column.
*
* @param scale the scale, or empty() if the scale does not apply to this type
* @return this editor so callers can chain methods together
*/
ColumnEditor scale(Optional<Integer> scale);
/**
* Set whether the column's values are optional (e.g., can contain nulls).
*

View File

@ -6,6 +6,7 @@
package io.debezium.relational;
import java.sql.Types;
import java.util.Optional;
final class ColumnEditorImpl implements ColumnEditor {
@ -17,7 +18,7 @@ final class ColumnEditorImpl implements ColumnEditor {
private String charsetName;
private String tableCharsetName;
private int length = Column.UNSET_INT_VALUE;
private int scale = Column.UNSET_INT_VALUE;
private Optional<Integer> scale = Optional.empty();
private int position = 1;
private boolean optional = true;
private boolean autoIncremented = false;
@ -69,7 +70,7 @@ public int length() {
}
@Override
public int scale() {
public Optional<Integer> scale() {
return scale;
}
@ -156,7 +157,11 @@ public ColumnEditorImpl length(int length) {
@Override
public ColumnEditorImpl scale(int scale) {
assert scale >= -1;
return scale(Optional.of(scale));
}
@Override
public ColumnEditorImpl scale(Optional<Integer> scale) {
this.scale = scale;
return this;
}

View File

@ -5,9 +5,10 @@
*/
package io.debezium.relational;
import io.debezium.util.Strings;
import java.util.Objects;
import java.util.Optional;
import io.debezium.util.Strings;
final class ColumnImpl implements Column, Comparable<Column> {
private final String name;
@ -18,7 +19,7 @@ final class ColumnImpl implements Column, Comparable<Column> {
private final String typeExpression;
private final String charsetName;
private final int length;
private final int scale;
private final Optional<Integer> scale;
private final boolean optional;
private final boolean autoIncremented;
private final boolean generated;
@ -26,14 +27,14 @@ final class ColumnImpl implements Column, Comparable<Column> {
private final boolean hasDefaultValue;
protected ColumnImpl(String columnName, int position, int jdbcType, int componentType, String typeName, String typeExpression,
String charsetName, String defaultCharsetName, int columnLength, int columnScale,
String charsetName, String defaultCharsetName, int columnLength, Optional<Integer> columnScale,
boolean optional, boolean autoIncremented, boolean generated) {
this(columnName, position, jdbcType, componentType, typeName, typeExpression, charsetName,
defaultCharsetName, columnLength, columnScale, optional, autoIncremented, generated, null, false);
}
protected ColumnImpl(String columnName, int position, int jdbcType, int nativeType, String typeName, String typeExpression,
String charsetName, String defaultCharsetName, int columnLength, int columnScale,
String charsetName, String defaultCharsetName, int columnLength, Optional<Integer> columnScale,
boolean optional, boolean autoIncremented, boolean generated, Object defaultValue, boolean hasDefaultValue) {
this.name = columnName;
this.position = position;
@ -54,7 +55,6 @@ protected ColumnImpl(String columnName, int position, int jdbcType, int nativeTy
this.generated = generated;
this.defaultValue = defaultValue;
this.hasDefaultValue = hasDefaultValue;
assert this.scale >= -1;
assert this.length >= -1;
}
@ -92,14 +92,14 @@ public String typeExpression() {
public String charsetName() {
return charsetName;
}
@Override
public int length() {
return length;
}
@Override
public int scale() {
public Optional<Integer> scale() {
return scale;
}
@ -161,9 +161,7 @@ public String toString() {
sb.append(" ").append(typeName);
if (length >= 0) {
sb.append('(').append(length);
if (scale >= 0) {
sb.append(',').append(scale);
}
scale.ifPresent(s -> sb.append(',').append(s));
sb.append(')');
}
if (charsetName != null && !charsetName.isEmpty()) {

View File

@ -203,9 +203,7 @@ private Document toDocument(Column column) {
document.setNumber("length", column.length());
}
if (column.scale() != Column.UNSET_INT_VALUE) {
document.setNumber("scale", column.scale());
}
column.scale().ifPresent(s -> document.setNumber("scale", s));
document.setNumber("position", column.position());
document.setBoolean("optional", column.isOptional());

View File

@ -5,13 +5,14 @@
*/
package io.debezium.relational;
import static org.fest.assertions.Assertions.assertThat;
import java.sql.Types;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
public class ColumnEditorTest {
private ColumnEditor editor;
@ -43,7 +44,7 @@ public void shouldCreateColumnWithAllFieldsSetToNonDefaults() {
assertThat(column.typeName()).isEqualTo("NUMBER");
assertThat(column.jdbcType()).isEqualTo(Types.DOUBLE);
assertThat(column.length()).isEqualTo(5);
assertThat(column.scale()).isEqualTo(2);
assertThat(column.scale().get()).isEqualTo(2);
assertThat(column.position()).isEqualTo(4);
assertThat(column.isOptional()).isTrue();
assertThat(column.isAutoIncremented()).isTrue();
@ -57,7 +58,7 @@ public void shouldCreateColumnWithAllFieldsSetToDefaults() {
assertThat(column.typeName()).isNull();
assertThat(column.jdbcType()).isEqualTo(Types.INTEGER);
assertThat(column.length()).isEqualTo(-1);
assertThat(column.scale()).isEqualTo(-1);
Assert.assertFalse(column.scale().isPresent());
assertThat(column.position()).isEqualTo(1);
assertThat(column.isOptional()).isTrue();
assertThat(column.isAutoIncremented()).isFalse();

View File

@ -11,6 +11,7 @@
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
@ -83,7 +84,12 @@ protected void assertColumn(Table table, String name, String typeName, int jdbcT
assertThat(column.typeName()).isEqualTo(typeName);
assertThat(column.jdbcType()).isEqualTo(jdbcType);
assertThat(column.length()).isEqualTo(length);
assertThat(column.scale()).isEqualTo(scale);
if (scale == Column.UNSET_INT_VALUE) {
assertFalse(column.scale().isPresent());
}
else {
assertThat(column.scale().get()).isEqualTo(scale);
}
assertThat(column.isOptional()).isEqualTo(optional);
assertThat(column.isGenerated()).isEqualTo(generated);
assertThat(column.isAutoIncremented()).isEqualTo(autoIncremented);