DBZ-20 Added ANSI double precision, real and variable scale number datatypes

This commit is contained in:
Jiri Pechanec 2018-03-28 05:59:09 +02:00
parent 10ddb69dea
commit 28b844bf99
4 changed files with 55 additions and 20 deletions

View File

@ -125,12 +125,17 @@ private String getTableName(Tableview_nameContext tableview_name) {
@Override @Override
public void exitColumn_definition(Column_definitionContext ctx) { public void exitColumn_definition(Column_definitionContext ctx) {
Precision_partContext precisionPart = ctx.datatype().precision_part();
ColumnEditor columnEditor = Column.editor(); ColumnEditor columnEditor = Column.editor();
columnEditor.name(getColumnName(ctx.column_name())); columnEditor.name(getColumnName(ctx.column_name()));
if (ctx.datatype().native_datatype_element().INT() != null if (ctx.datatype().native_datatype_element().INT() != null
|| ctx.datatype().native_datatype_element().INTEGER() != null || ctx.datatype().native_datatype_element().INTEGER() != null
|| ctx.datatype().native_datatype_element().SMALLINT() != null) { || ctx.datatype().native_datatype_element().SMALLINT() != null
|| ctx.datatype().native_datatype_element().NUMERIC() != null
|| ctx.datatype().native_datatype_element().DECIMAL() != null) {
// UMERIC and DECIMAl types have by default zero scale
columnEditor.jdbcType(Types.NUMERIC); columnEditor.jdbcType(Types.NUMERIC);
columnEditor.type("NUMBER"); columnEditor.type("NUMBER");
columnEditor.length(38); columnEditor.length(38);
@ -188,22 +193,34 @@ else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) {
columnEditor.jdbcType(OracleTypes.BINARY_DOUBLE); columnEditor.jdbcType(OracleTypes.BINARY_DOUBLE);
columnEditor.type("BINARY_DOUBLE"); columnEditor.type("BINARY_DOUBLE");
} }
else if (ctx.datatype().native_datatype_element().FLOAT() != null) { // PRECISION keyword is mandatory
else if (ctx.datatype().native_datatype_element().FLOAT() != null ||
(ctx.datatype().native_datatype_element().DOUBLE() != null && ctx.datatype().native_datatype_element().PRECISION() != null)) {
columnEditor.jdbcType(Types.FLOAT); columnEditor.jdbcType(Types.FLOAT);
columnEditor.type("FLOAT"); columnEditor.type("FLOAT");
columnEditor.length(126); columnEditor.length(126);
} }
else if (ctx.datatype().native_datatype_element().NUMERIC() != null else if (ctx.datatype().native_datatype_element().REAL() != null) {
|| ctx.datatype().native_datatype_element().NUMBER() != null columnEditor.jdbcType(Types.FLOAT);
|| ctx.datatype().native_datatype_element().DECIMAL() != null) { columnEditor.type("FLOAT");
columnEditor.jdbcType(Types.NUMERIC); columnEditor.length(63);
columnEditor.type("NUMBER"); }
else if (ctx.datatype().native_datatype_element().NUMBER() != null) {
columnEditor
.jdbcType(Types.NUMERIC)
.type("NUMBER")
.length(38)
.scale(0);
if (precisionPart == null) {
columnEditor
.length(0)
.scale(-127);
}
} }
else { else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText()); throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText());
} }
Precision_partContext precisionPart = ctx.datatype().precision_part();
if (precisionPart != null) { if (precisionPart != null) {
columnEditor.length(Integer.valueOf(precisionPart.numeric(0).getText())); columnEditor.length(Integer.valueOf(precisionPart.numeric(0).getText()));

View File

@ -30,6 +30,8 @@
import oracle.sql.TIMESTAMPTZ; import oracle.sql.TIMESTAMPTZ;
public class OracleValueConverters extends JdbcValueConverters { public class OracleValueConverters extends JdbcValueConverters {
private static int NUMBER_VARIABLE_SCALE_LENGTH = 0;
private final OracleConnection connection; private final OracleConnection connection;
public OracleValueConverters(OracleConnection connection) { public OracleValueConverters(OracleConnection connection) {
@ -38,12 +40,22 @@ public OracleValueConverters(OracleConnection connection) {
@Override @Override
public SchemaBuilder schemaBuilder(Column column) { public SchemaBuilder schemaBuilder(Column column) {
logger.debug("Building schema for column {} of type {} named {}", column.name(), column.jdbcType(), column.typeName()); logger.debug("Building schema for column {} of type {} named {} with constraints ({},{})",
column.name(),
column.jdbcType(),
column.typeName(),
column.length(),
column.scale()
);
switch (column.jdbcType()) { switch (column.jdbcType()) {
// Oracle's float is not float as in Java but a NUMERIC without scale // Oracle's float is not float as in Java but a NUMERIC without scale
case Types.FLOAT: case Types.FLOAT:
return VariableScaleDecimal.builder(); return VariableScaleDecimal.builder();
case Types.NUMERIC:
return column.length() == NUMBER_VARIABLE_SCALE_LENGTH ?
VariableScaleDecimal.builder() :
super.schemaBuilder(column);
case OracleTypes.BINARY_FLOAT: case OracleTypes.BINARY_FLOAT:
return SchemaBuilder.float32(); return SchemaBuilder.float32();
case OracleTypes.BINARY_DOUBLE: case OracleTypes.BINARY_DOUBLE:
@ -69,9 +81,11 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case OracleTypes.BINARY_DOUBLE: case OracleTypes.BINARY_DOUBLE:
return data -> convertDouble(column, fieldDefn, data); return data -> convertDouble(column, fieldDefn, data);
case Types.NUMERIC: case Types.NUMERIC:
return data -> convertNumeric(column, fieldDefn, data); return column.length() == NUMBER_VARIABLE_SCALE_LENGTH ?
data -> convertVariableScale(column, fieldDefn, data) :
data -> convertNumeric(column, fieldDefn, data);
case Types.FLOAT: case Types.FLOAT:
return data -> convertOracleFloat(column, fieldDefn, data); return data -> convertVariableScale(column, fieldDefn, data);
case OracleTypes.TIMESTAMPTZ: case OracleTypes.TIMESTAMPTZ:
case OracleTypes.TIMESTAMPLTZ: case OracleTypes.TIMESTAMPLTZ:
return (data) -> convertTimestampWithZone(column, fieldDefn, data); return (data) -> convertTimestampWithZone(column, fieldDefn, data);
@ -162,7 +176,7 @@ protected Object convertNumeric(Column column, Field fieldDefn, Object data) {
return super.convertNumeric(column, fieldDefn, data); return super.convertNumeric(column, fieldDefn, data);
} }
protected Object convertOracleFloat(Column column, Field fieldDefn, Object data) { protected Object convertVariableScale(Column column, Field fieldDefn, Object data) {
data = convertNumeric(column, fieldDefn, data); data = convertNumeric(column, fieldDefn, data);
if (data == null) { if (data == null) {

View File

@ -48,8 +48,6 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
" val_nvarchar2 nvarchar2(1000), " + " val_nvarchar2 nvarchar2(1000), " +
" val_char char(3), " + " val_char char(3), " +
" val_nchar nchar(3), " + " val_nchar nchar(3), " +
// " val_character_varying character varying(1000), " +
// " val_national_char national char(4), " +
" primary key (id)" + " primary key (id)" +
")"; ")";
@ -59,6 +57,9 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
" val_bd binary_double, " + " val_bd binary_double, " +
" val_f float, " + " val_f float, " +
" val_num number(10,6), " + " val_num number(10,6), " +
" val_dp double precision, " +
" val_r real, " +
" val_num_vs number, " +
" primary key (id)" + " primary key (id)" +
")"; ")";
@ -84,15 +85,16 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
new SchemaAndValueField("VAL_NVARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "nv\u010d2"), new SchemaAndValueField("VAL_NVARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "nv\u010d2"),
new SchemaAndValueField("VAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "c "), new SchemaAndValueField("VAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "c "),
new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d ") new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d ")
// new SchemaAndValueField("VAL_CHARACTER_VARYING", Schema.OPTIONAL_STRING_SCHEMA, "av\u010d2"),
// new SchemaAndValueField("VAL_NATIONAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "an\u010d ")
); );
private static final List<SchemaAndValueField> EXPECTED_FP = Arrays.asList( private static final List<SchemaAndValueField> EXPECTED_FP = Arrays.asList(
new SchemaAndValueField("VAL_BF", Schema.OPTIONAL_FLOAT32_SCHEMA, 1.1f), new SchemaAndValueField("VAL_BF", Schema.OPTIONAL_FLOAT32_SCHEMA, 1.1f),
new SchemaAndValueField("VAL_BD", Schema.OPTIONAL_FLOAT64_SCHEMA, 2.22), new SchemaAndValueField("VAL_BD", Schema.OPTIONAL_FLOAT64_SCHEMA, 2.22),
new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().schema(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().schema(), new SpecialValueDecimal(new BigDecimal("3.33")))), new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().schema(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().schema(), new SpecialValueDecimal(new BigDecimal("3.33")))),
new SchemaAndValueField("VAL_NUM", Decimal.builder(6).optional().schema(), new BigDecimal("4.4444")) new SchemaAndValueField("VAL_NUM", Decimal.builder(6).optional().schema(), new BigDecimal("4.4444")),
new SchemaAndValueField("VAL_DP", VariableScaleDecimal.builder().optional().schema(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().schema(), new SpecialValueDecimal(new BigDecimal("5.555")))),
new SchemaAndValueField("VAL_R", VariableScaleDecimal.builder().optional().schema(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().schema(), new SpecialValueDecimal(new BigDecimal("6.66")))),
new SchemaAndValueField("VAL_NUM_VS", VariableScaleDecimal.builder().optional().schema(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().schema(), new SpecialValueDecimal(new BigDecimal("77.323"))))
); );
private static final List<SchemaAndValueField> EXPECTED_INT = Arrays.asList( private static final List<SchemaAndValueField> EXPECTED_INT = Arrays.asList(
@ -176,7 +178,7 @@ public void stringTypes() throws Exception {
@Test @Test
public void fpTypes() throws Exception { public void fpTypes() throws Exception {
int expectedRecordCount = 0; int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_fp VALUES (1, 1.1, 2.22, 3.33, 4.4444)"); connection.execute("INSERT INTO debezium.type_fp VALUES (1, 1.1, 2.22, 3.33, 4.4444, 5.555, 6.66, 77.323)");
connection.execute("COMMIT"); connection.execute("COMMIT");
Testing.debug("Inserted"); Testing.debug("Inserted");

View File

@ -66,9 +66,11 @@ private void assertValue(Struct content) {
if (actualValue instanceof byte[]) { if (actualValue instanceof byte[]) {
Assertions.assertThat((byte[]) actualValue).as("Values don't match for " + fieldName).isEqualTo((byte[]) value); Assertions.assertThat((byte[]) actualValue).as("Values don't match for " + fieldName).isEqualTo((byte[]) value);
} else if (actualValue instanceof Struct) { }
else if (actualValue instanceof Struct) {
assertStruct((Struct)value, (Struct)actualValue); assertStruct((Struct)value, (Struct)actualValue);
} else { }
else {
Assertions.assertThat(actualValue).as("Values don't match for " + fieldName).isEqualTo(value); Assertions.assertThat(actualValue).as("Values don't match for " + fieldName).isEqualTo(value);
} }
} }