From 10ddb69deaf36ff3cd45f2c27728d7b84e4b2574 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 27 Mar 2018 13:17:14 +0200 Subject: [PATCH] DBZ-20 A subset of string, time and number datatypes --- .../connector/oracle/OracleConnectorTask.java | 2 +- .../oracle/OracleDatabaseSchema.java | 4 +- .../connector/oracle/OracleDdlParser.java | 69 ++++- .../OracleSchemaChangeEventEmitter.java | 2 +- .../oracle/OracleValueConverters.java | 155 +++++++++++- .../oracle/AbstractOracleDatatypesTest.java | 239 ++++++++++++++++++ .../connector/oracle/OracleDdlParserTest.java | 13 + .../connector/oracle/SnapshotDatatypesIT.java | 40 +++ .../oracle/StreamingDatatypesIT.java | 34 +++ .../java/io/debezium/time/Conversions.java | 12 +- .../io/debezium/data/SchemaAndValueField.java | 107 ++++++++ .../embedded/AbstractConnectorTest.java | 2 +- 12 files changed, 652 insertions(+), 27 deletions(-) create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java create mode 100644 debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java create mode 100644 debezium-core/src/test/java/io/debezium/data/SchemaAndValueField.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index e5b19cdac..2f553a070 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -80,7 +80,7 @@ public void start(Configuration config) { jdbcConnection = new OracleConnection(jdbcConfig, new OracleConnectionFactory()); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); - this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector); + this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig); if (previousOffset != null) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java index da533feea..faa0220af 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java @@ -40,12 +40,12 @@ public class OracleDatabaseSchema implements RelationalDatabaseSchema { private final TableSchemaBuilder tableSchemaBuilder; private final DatabaseHistory databaseHistory; - public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector) { + public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, OracleConnection connection) { this.topicSelector = topicSelector; this.tables = new Tables(); this.schemas = new HashMap<>(); - this.tableSchemaBuilder = new TableSchemaBuilder(new OracleValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA); + this.tableSchemaBuilder = new TableSchemaBuilder(new OracleValueConverters(connection), schemaNameAdjuster, SourceInfo.SCHEMA); this.databaseHistory = connectorConfig.getDatabaseHistory(); this.databaseHistory.start(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java index a75c486d6..908500fee 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDdlParser.java @@ -33,6 +33,7 @@ import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlChanges; import io.debezium.relational.ddl.DdlParser; +import oracle.jdbc.OracleTypes; public class OracleDdlParser implements DdlParser { @@ -127,29 +128,76 @@ public void exitColumn_definition(Column_definitionContext ctx) { ColumnEditor columnEditor = Column.editor(); columnEditor.name(getColumnName(ctx.column_name())); - if (ctx.datatype().native_datatype_element().INT() != null || ctx.datatype().native_datatype_element().INTEGER() != null) { + if (ctx.datatype().native_datatype_element().INT() != null + || ctx.datatype().native_datatype_element().INTEGER() != null + || ctx.datatype().native_datatype_element().SMALLINT() != null) { columnEditor.jdbcType(Types.NUMERIC); columnEditor.type("NUMBER"); columnEditor.length(38); columnEditor.scale(0); } else if (ctx.datatype().native_datatype_element().DATE() != null) { - columnEditor.jdbcType(Types.DATE); + // JDBC driver reports type as timestamp but name DATE + columnEditor.jdbcType(Types.TIMESTAMP); columnEditor.type("DATE"); } else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) { - columnEditor.jdbcType(Types.TIMESTAMP); - columnEditor.type("TIMESTAMP"); + if (ctx.datatype().WITH() != null + && ctx.datatype().TIME() != null + && ctx.datatype().ZONE() != null) { + if (ctx.datatype().LOCAL() != null) { + columnEditor.jdbcType(OracleTypes.TIMESTAMPLTZ); + columnEditor.type("TIMESTAMP WITH LOCAL TIME ZONE"); + } + else { + columnEditor.jdbcType(OracleTypes.TIMESTAMPTZ); + columnEditor.type("TIMESTAMP WITH TIME ZONE"); + } + } + else { + columnEditor.jdbcType(Types.TIMESTAMP); + columnEditor.type("TIMESTAMP"); + } columnEditor.length(6); - columnEditor.scale(0); } else if (ctx.datatype().native_datatype_element().VARCHAR2() != null) { columnEditor.jdbcType(Types.VARCHAR); columnEditor.type("VARCHAR2"); + columnEditor.length(getVarCharDefaultLength()); } - else if (ctx.datatype().native_datatype_element().DECIMAL() != null) { - columnEditor.jdbcType(Types.DECIMAL); - columnEditor.type("DECIMAL"); + else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) { + columnEditor.jdbcType(Types.NVARCHAR); + columnEditor.type("NVARCHAR2"); + columnEditor.length(getVarCharDefaultLength()); + } + else if (ctx.datatype().native_datatype_element().CHAR() != null) { + columnEditor.jdbcType(Types.CHAR); + columnEditor.type("CHAR"); + columnEditor.length(1); + } + else if (ctx.datatype().native_datatype_element().NCHAR() != null) { + columnEditor.jdbcType(Types.NCHAR); + columnEditor.type("NCHAR"); + columnEditor.length(1); + } + else if (ctx.datatype().native_datatype_element().BINARY_FLOAT() != null) { + columnEditor.jdbcType(OracleTypes.BINARY_FLOAT); + columnEditor.type("BINARY_FLOAT"); + } + else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) { + columnEditor.jdbcType(OracleTypes.BINARY_DOUBLE); + columnEditor.type("BINARY_DOUBLE"); + } + else if (ctx.datatype().native_datatype_element().FLOAT() != null) { + columnEditor.jdbcType(Types.FLOAT); + columnEditor.type("FLOAT"); + columnEditor.length(126); + } + else if (ctx.datatype().native_datatype_element().NUMERIC() != null + || ctx.datatype().native_datatype_element().NUMBER() != null + || ctx.datatype().native_datatype_element().DECIMAL() != null) { + columnEditor.jdbcType(Types.NUMERIC); + columnEditor.type("NUMBER"); } else { throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText()); @@ -176,6 +224,11 @@ else if (ctx.datatype().native_datatype_element().DECIMAL() != null) { super.exitColumn_definition(ctx); } + private int getVarCharDefaultLength() { + // TODO replace with falue from select name, value from v$parameter where name='max_string_size'; + return 4000; + } + @Override public void exitOut_of_line_constraint(Out_of_line_constraintContext ctx) { if(ctx.PRIMARY() != null) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java index 87daf5961..1178869a9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java @@ -64,7 +64,7 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException private SchemaChangeEventType getSchemaChangeEventType() { switch(ddlLcr.getCommandType()) { case "CREATE TABLE": return SchemaChangeEventType.CREATE; - case "ALTER TABLE": throw new UnsupportedOperationException("ALTER TABLE not yet implemented"); +// case "ALTER TABLE": throw new UnsupportedOperationException("ALTER TABLE not yet implemented"); case "DROP TABLE": throw new UnsupportedOperationException("DROP TABLE not yet implemented"); default: LOGGER.debug("Ignoring DDL event of type {}", ddlLcr.getCommandType()); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java index 19f334b37..1e6100af7 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java @@ -5,29 +5,90 @@ */ package io.debezium.connector.oracle; +import java.math.BigDecimal; import java.sql.SQLException; import java.sql.Types; +import java.time.ZonedDateTime; import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.SchemaBuilder; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.data.VariableScaleDecimal; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.relational.Column; import io.debezium.relational.ValueConverter; +import io.debezium.time.ZonedTimestamp; +import oracle.jdbc.OracleTypes; +import oracle.sql.BINARY_DOUBLE; +import oracle.sql.BINARY_FLOAT; +import oracle.sql.CHAR; +import oracle.sql.DATE; import oracle.sql.NUMBER; import oracle.sql.TIMESTAMP; +import oracle.sql.TIMESTAMPLTZ; +import oracle.sql.TIMESTAMPTZ; public class OracleValueConverters extends JdbcValueConverters { + private final OracleConnection connection; + + public OracleValueConverters(OracleConnection connection) { + this.connection = connection; + } + + @Override + public SchemaBuilder schemaBuilder(Column column) { + logger.debug("Building schema for column {} of type {} named {}", column.name(), column.jdbcType(), column.typeName()); + + switch (column.jdbcType()) { + // Oracle's float is not float as in Java but a NUMERIC without scale + case Types.FLOAT: + return VariableScaleDecimal.builder(); + case OracleTypes.BINARY_FLOAT: + return SchemaBuilder.float32(); + case OracleTypes.BINARY_DOUBLE: + return SchemaBuilder.float64(); + case OracleTypes.TIMESTAMPTZ: + case OracleTypes.TIMESTAMPLTZ: + return ZonedTimestamp.builder(); + default: + return super.schemaBuilder(column); + } + } @Override public ValueConverter converter(Column column, Field fieldDefn) { switch(column.jdbcType()) { + case Types.CHAR: + case Types.VARCHAR: + case Types.NCHAR: + case Types.NVARCHAR: + return data -> convertString(column, fieldDefn, data); + case OracleTypes.BINARY_FLOAT: + return data -> convertFloat(column, fieldDefn, data); + case OracleTypes.BINARY_DOUBLE: + return data -> convertDouble(column, fieldDefn, data); case Types.NUMERIC: return data -> convertNumeric(column, fieldDefn, data); + case Types.FLOAT: + return data -> convertOracleFloat(column, fieldDefn, data); + case OracleTypes.TIMESTAMPTZ: + case OracleTypes.TIMESTAMPLTZ: + return (data) -> convertTimestampWithZone(column, fieldDefn, data); } return super.converter(column, fieldDefn); } + @Override + protected Object convertString(Column column, Field fieldDefn, Object data) { + if (data instanceof CHAR) { + return ((CHAR)data).stringValue(); + } + + return super.convertString(column, fieldDefn, data); + } + @Override protected Object convertInteger(Column column, Field fieldDefn, Object data) { if (data instanceof NUMBER) { @@ -42,6 +103,37 @@ protected Object convertInteger(Column column, Field fieldDefn, Object data) { return super.convertInteger(column, fieldDefn, data); } + @Override + protected Object convertFloat(Column column, Field fieldDefn, Object data) { + if (data instanceof NUMBER) { + return ((NUMBER)data).floatValue(); + } + else if (data instanceof BINARY_FLOAT) { + try { + return ((BINARY_FLOAT)data).floatValue(); + } + catch (SQLException e) { + throw new RuntimeException("Couldn't convert value for column " + column.name(), e); + } + } + + return super.convertFloat(column, fieldDefn, data); + } + + @Override + protected Object convertDouble(Column column, Field fieldDefn, Object data) { + if (data instanceof BINARY_DOUBLE) { + try { + return ((BINARY_DOUBLE)data).doubleValue(); + } + catch (SQLException e) { + throw new RuntimeException("Couldn't convert value for column " + column.name(), e); + } + } + + return super.convertDouble(column, fieldDefn, data); + } + @Override protected Object convertDecimal(Column column, Field fieldDefn, Object data) { if (data instanceof NUMBER) { @@ -70,17 +162,64 @@ protected Object convertNumeric(Column column, Field fieldDefn, Object data) { return super.convertNumeric(column, fieldDefn, data); } - @Override - protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object data) { - if (data instanceof TIMESTAMP) { - try { - data = ((TIMESTAMP)data).timestampValue(); + protected Object convertOracleFloat(Column column, Field fieldDefn, Object data) { + data = convertNumeric(column, fieldDefn, data); + + if (data == null) { + return null; + } + // TODO Need to handle special values, it is not supported in variable scale decimal + else if (data instanceof SpecialValueDecimal) { + return VariableScaleDecimal.fromLogical(fieldDefn.schema(), (SpecialValueDecimal)data); + } + else if (data instanceof BigDecimal) { + return VariableScaleDecimal.fromLogical(fieldDefn.schema(), new SpecialValueDecimal((BigDecimal)data)); + } + return handleUnknownData(column, fieldDefn, data); + } + + protected Object fromOracleTimeClasses(Column column, Object data) { + try { + if (data instanceof TIMESTAMP) { + data = ((TIMESTAMP) data).timestampValue(); } - catch (SQLException e) { - throw new RuntimeException("Couldn't convert value for column " + column.name(), e); + else if (data instanceof DATE) { + data = ((DATE) data).timestampValue(); + } + else if (data instanceof TIMESTAMPTZ) { + final TIMESTAMPTZ ts = (TIMESTAMPTZ)data; + data = ZonedDateTime.ofInstant(ts.timestampValue(connection.connection()).toInstant(), ts.getTimeZone().toZoneId()); + } + else if (data instanceof TIMESTAMPLTZ) { + // JDBC driver throws an exception +// final TIMESTAMPLTZ ts = (TIMESTAMPLTZ)data; +// data = ts.offsetDateTimeValue(connection.connection()); + return null; } } + catch (SQLException e) { + throw new RuntimeException("Couldn't convert value for column " + column.name(), e); + } + return data; + } - return super.convertTimestampToEpochMicros(column, fieldDefn, data); + @Override + protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object data) { + return super.convertTimestampToEpochMicros(column, fieldDefn, fromOracleTimeClasses(column, data)); + } + + @Override + protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object data) { + return super.convertTimestampToEpochMillis(column, fieldDefn, fromOracleTimeClasses(column, data)); + } + + @Override + protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object data) { + return super.convertTimestampToEpochNanos(column, fieldDefn, fromOracleTimeClasses(column, data)); + } + + @Override + protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) { + return super.convertTimestampWithZone(column, fieldDefn, fromOracleTimeClasses(column, data)); } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java new file mode 100644 index 000000000..8d4c738d5 --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java @@ -0,0 +1,239 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import static org.fest.assertions.Assertions.assertThat; + +import java.math.BigDecimal; +import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.data.SchemaAndValueField; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.data.VariableScaleDecimal; +import io.debezium.data.VerifyRecord; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.Timestamp; +import io.debezium.time.ZonedTimestamp; +import io.debezium.util.Testing; + +/** + * Integration test to verify different Oracle datatypes. + * + * @author Jiri Pechanec + */ +public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest { + + private static final Schema INTEGER_SCHEMA = Decimal.builder(0).optional().schema(); + + private static final String DDL_STRING = "create table debezium.type_string (" + + " id int not null, " + + " val_varchar2 varchar2(1000), " + + " val_nvarchar2 nvarchar2(1000), " + + " val_char char(3), " + + " val_nchar nchar(3), " + +// " val_character_varying character varying(1000), " + +// " val_national_char national char(4), " + + " primary key (id)" + + ")"; + + private static final String DDL_FP = "create table debezium.type_fp (" + + " id int not null, " + + " val_bf binary_float, " + + " val_bd binary_double, " + + " val_f float, " + + " val_num number(10,6), " + + " primary key (id)" + + ")"; + + private static final String DDL_INT = "create table debezium.type_int (" + + " id int not null, " + + " val_int int, " + + " val_integer integer, " + + " val_smallint smallint, " + + " primary key (id)" + + ")"; + + private static final String DDL_TIME = "create table debezium.type_time (" + + " id int not null, " + + " val_date date, " + + " val_ts timestamp, " + + " val_tstz timestamp with time zone, " + + " val_tsltz timestamp with local time zone, " + + " primary key (id)" + + ")"; + + private static final List EXPECTED_STRING = Arrays.asList( + new SchemaAndValueField("VAL_VARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"), + new SchemaAndValueField("VAL_NVARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "nv\u010d2"), + new SchemaAndValueField("VAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "c "), + new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d ") +// new SchemaAndValueField("VAL_CHARACTER_VARYING", Schema.OPTIONAL_STRING_SCHEMA, "av\u010d2"), +// new SchemaAndValueField("VAL_NATIONAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "an\u010d ") + ); + + private static final List EXPECTED_FP = Arrays.asList( + new SchemaAndValueField("VAL_BF", Schema.OPTIONAL_FLOAT32_SCHEMA, 1.1f), + new SchemaAndValueField("VAL_BD", Schema.OPTIONAL_FLOAT64_SCHEMA, 2.22), + new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().schema(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().schema(), new SpecialValueDecimal(new BigDecimal("3.33")))), + new SchemaAndValueField("VAL_NUM", Decimal.builder(6).optional().schema(), new BigDecimal("4.4444")) + ); + + private static final List EXPECTED_INT = Arrays.asList( + new SchemaAndValueField("VAL_INT", INTEGER_SCHEMA, new BigDecimal("1")), + new SchemaAndValueField("VAL_INTEGER", INTEGER_SCHEMA, new BigDecimal("22")), + new SchemaAndValueField("VAL_SMALLINT", INTEGER_SCHEMA, new BigDecimal("333")) + ); + + private static final List EXPECTED_TIME = Arrays.asList( + new SchemaAndValueField("VAL_DATE", Timestamp.builder().optional().build(), 1522108800_000l), + new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890), + new SchemaAndValueField("VAL_TSTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00") +// new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00") + ); + + private static final String[] ALL_TABLES = { + "debezium.type_string", + "debezium.type_fp", + "debezium.type_int", + "debezium.type_time" + }; + + private static final String[] ALL_DDLS = { + DDL_STRING, + DDL_FP, + DDL_INT, + DDL_TIME + }; + + private static OracleConnection connection; + + @BeforeClass + public static void dropTables() throws SQLException { + connection = TestHelper.testConnection(); + for (String table: ALL_TABLES) { + TestHelper.dropTable(connection, table); + } + } + + protected static void createTables() throws SQLException { + connection.execute(ALL_DDLS); + for (String table: ALL_TABLES) { + streamTable(table); + } + } + + private static void streamTable(String table) throws SQLException { + connection.execute("GRANT SELECT ON " + table + " to c##xstrmadmin"); + connection.execute("ALTER TABLE " + table + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); + } + + + @AfterClass + public static void closeConnection() throws SQLException { + if (connection != null) { + connection.close(); + } + } + + @Test + public void stringTypes() throws Exception { + int expectedRecordCount = 0; +// connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d', 'av\u010d2', 'an\u010d')"); + connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d')"); + connection.execute("COMMIT"); + + Testing.debug("Inserted"); + expectedRecordCount++; + + final SourceRecords records = consumeRecordsByTopic(expectedRecordCount); + + List testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_STRING"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_STRING); + } + + @Test + public void fpTypes() throws Exception { + int expectedRecordCount = 0; + connection.execute("INSERT INTO debezium.type_fp VALUES (1, 1.1, 2.22, 3.33, 4.4444)"); + connection.execute("COMMIT"); + + Testing.debug("Inserted"); + expectedRecordCount++; + + final SourceRecords records = consumeRecordsByTopic(expectedRecordCount); + + List testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_FP"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_FP); + } + + @Test + public void intTypes() throws Exception { + int expectedRecordCount = 0; + connection.execute("INSERT INTO debezium.type_int VALUES (1, 1, 22, 333)"); + connection.execute("COMMIT"); + + Testing.debug("Inserted"); + expectedRecordCount++; + + final SourceRecords records = consumeRecordsByTopic(expectedRecordCount); + + List testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_INT"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_INT); + } + + @Test + public void timeTypes() throws Exception { + int expectedRecordCount = 0; + connection.execute("INSERT INTO debezium.type_time VALUES (1, '27-MAR-2018', '27-MAR-2018 12:34:56.00789', '27-MAR-2018 01:34:56.00789 am -11:00', '27-MAR-2018 01:34:56.00789')"); + connection.execute("COMMIT"); + + Testing.debug("Inserted"); + expectedRecordCount++; + + final SourceRecords records = consumeRecordsByTopic(expectedRecordCount); + + List testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_TIME"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + + // insert + VerifyRecord.isValidInsert(testTableRecords.get(0)); + Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after"); + assertRecord(after, EXPECTED_TIME); + } + + private void assertRecord(Struct record, List expected) { + expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java index 1550de97a..c781d7170 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java @@ -50,6 +50,19 @@ public void shouldParseCreateTable() { assertThat(id.jdbcType()).isEqualTo(Types.NUMERIC); assertThat(id.typeName()).isEqualTo("NUMBER"); + final Column name = table.columnWithName("NAME"); + assertThat(name.isOptional()).isTrue(); + assertThat(name.jdbcType()).isEqualTo(Types.VARCHAR); + assertThat(name.typeName()).isEqualTo("VARCHAR2"); + assertThat(name.length()).isEqualTo(1000); + + final Column score = table.columnWithName("SCORE"); + assertThat(score.isOptional()).isTrue(); + assertThat(score.jdbcType()).isEqualTo(Types.DECIMAL); + assertThat(score.typeName()).isEqualTo("DECIMAL"); + assertThat(score.length()).isEqualTo(6); + assertThat(score.scale()).isEqualTo(2); + assertThat(table.columns()).hasSize(4); assertThat(table.isPrimaryKeyColumn("ID")); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java new file mode 100644 index 000000000..306e3504c --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import java.sql.SQLException; + +import org.junit.Before; +import org.junit.BeforeClass; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.util.Testing; + +/** + * Integration test to verify different Oracle datatypes. + * + * @author Jiri Pechanec + */ +public class SnapshotDatatypesIT extends AbstractOracleDatatypesTest { + + @BeforeClass + public static void beforeClass() throws SQLException { + createTables(); + } + + @Before + public void before() throws Exception { + initializeConnectorTestFramework(); + Testing.Debug.enable(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + + Configuration config = TestHelper.defaultConfig().build(); + start(OracleConnector.class, config); + assertConnectorIsRunning(); + Thread.sleep(1000); + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java new file mode 100644 index 000000000..705b76d77 --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java @@ -0,0 +1,34 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import org.junit.Before; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.util.Testing; + +/** + * Integration test to verify different Oracle datatypes. + * + * @author Jiri Pechanec + */ +public class StreamingDatatypesIT extends AbstractOracleDatatypesTest { + + @Before + public void before() throws Exception { + dropTables(); + initializeConnectorTestFramework(); + Testing.Files.delete(TestHelper.DB_HISTORY_PATH); + + Configuration config = TestHelper.defaultConfig().build(); + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + Thread.sleep(1000); + createTables(); + } +} diff --git a/debezium-core/src/main/java/io/debezium/time/Conversions.java b/debezium-core/src/main/java/io/debezium/time/Conversions.java index 4e6621be9..fabfac9ea 100644 --- a/debezium-core/src/main/java/io/debezium/time/Conversions.java +++ b/debezium-core/src/main/java/io/debezium/time/Conversions.java @@ -15,7 +15,7 @@ /** * Temporal conversion constants. - * + * * @author Randall Hauch */ public final class Conversions { @@ -33,7 +33,7 @@ public final class Conversions { private Conversions() { } - + @SuppressWarnings("deprecation") protected static LocalDate toLocalDate(Object obj) { if ( obj == null ) { @@ -167,10 +167,10 @@ protected static LocalDateTime toLocalDateTime(Object obj) { } throw new IllegalArgumentException("Unable to convert to LocalTime from unexpected value '" + obj + "' of type " + obj.getClass().getName()); } - + /** * Get the number of nanoseconds past epoch of the given {@link LocalDateTime}. - * + * * @param timestamp the Java timestamp value * @return the epoch nanoseconds */ @@ -182,7 +182,7 @@ static long toEpochNanos(LocalDateTime timestamp) { /** * Get the number of nanoseconds past epoch of the given {@link LocalDate}. - * + * * @param date the Java date value * @return the epoch nanoseconds */ @@ -227,7 +227,7 @@ public static LocalDateTime fromNanosToLocalDateTimeUTC(long nanoseconds) { /** * Get the number of nanoseconds past epoch of the given {@link Instant}. - * + * * @param instant the Java instant value * @return the epoch nanoseconds */ diff --git a/debezium-core/src/test/java/io/debezium/data/SchemaAndValueField.java b/debezium-core/src/test/java/io/debezium/data/SchemaAndValueField.java new file mode 100644 index 000000000..55feae551 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/data/SchemaAndValueField.java @@ -0,0 +1,107 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.data; + +import java.util.List; +import java.util.function.Supplier; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.fest.assertions.Assertions; + +public class SchemaAndValueField { + private final Object schema; + private final Object value; + private final String fieldName; + private Supplier assertValueOnlyIf = null; + + public SchemaAndValueField(String fieldName, Object schema, Object value) { + this.schema = schema; + this.value = value; + this.fieldName = fieldName; + } + + public SchemaAndValueField assertValueOnlyIf(final Supplier predicate) { + assertValueOnlyIf = predicate; + return this; + } + + public void assertFor(Struct content) { + assertSchema(content); + assertValue(content); + } + + private void assertValue(Struct content) { + if (assertValueOnlyIf != null && !assertValueOnlyIf.get()) { + return; + } + + if (value == null) { + Assertions.assertThat(content.get(fieldName)).as(fieldName + " is present in the actual content").isNull(); + return; + } + Object actualValue = content.get(fieldName); + Assertions.assertThat(actualValue).as(fieldName + " is not present in the actual content").isNotNull(); + + // assert the value type; for List all implementation types (e.g. immutable ones) are acceptable + if(actualValue instanceof List) { + Assertions.assertThat(value).as("Incorrect value type for " + fieldName).isInstanceOf(List.class); + final List actualValueList = (List)actualValue; + final List valueList = (List)value; + Assertions.assertThat(actualValueList).as("List size don't match for " + fieldName).hasSize(valueList.size()); + if (!valueList.isEmpty() && valueList.iterator().next() instanceof Struct) { + for (int i = 0; i < valueList.size(); i++) { + assertStruct((Struct)valueList.get(i), (Struct)actualValueList.get(i)); + } + return; + } + } + else { + Assertions.assertThat(actualValue.getClass()).as("Incorrect value type for " + fieldName).isEqualTo(value.getClass()); + } + + if (actualValue instanceof byte[]) { + Assertions.assertThat((byte[]) actualValue).as("Values don't match for " + fieldName).isEqualTo((byte[]) value); + } else if (actualValue instanceof Struct) { + assertStruct((Struct)value, (Struct)actualValue); + } else { + Assertions.assertThat(actualValue).as("Values don't match for " + fieldName).isEqualTo(value); + } + } + + private void assertStruct(final Struct expectedStruct, final Struct actualStruct) { + expectedStruct.schema().fields().stream().forEach(field -> { + final Object expectedValue = expectedStruct.get(field); + if (expectedValue == null) { + Assertions.assertThat(actualStruct.get(field.name())).as(fieldName + " is present in the actual content").isNull(); + return; + } + final Object actualValue = actualStruct.get(field.name()); + Assertions.assertThat(actualValue).as("No value found for " + fieldName).isNotNull(); + Assertions.assertThat(actualValue.getClass()).as("Incorrect value type for " + fieldName).isEqualTo(expectedValue.getClass()); + if (actualValue instanceof byte[]) { + Assertions.assertThat(expectedValue).as("Array is not expected for " + fieldName).isInstanceOf(byte[].class); + Assertions.assertThat((byte[]) actualValue).as("Values don't match for " + fieldName).isEqualTo((byte[]) expectedValue); + } + else if (actualValue instanceof Struct) { + assertStruct((Struct)expectedValue, (Struct)actualValue); + }else { + Assertions.assertThat(actualValue).as("Values don't match for " + fieldName).isEqualTo(expectedValue); + } + }); + } + + private void assertSchema(Struct content) { + if (schema == null) { + return; + } + Schema schema = content.schema(); + Field field = schema.field(fieldName); + Assertions.assertThat(field).as(fieldName + " not found in schema " + schema).isNotNull(); + Assertions.assertThat(field.schema()).as("Schema for " + field.name() + " does not match the actual value").isEqualTo(this.schema); + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index a5d46c970..e7c5a2896 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -82,7 +82,7 @@ public abstract class AbstractConnectorTest implements Testing { private ExecutorService executor; protected EmbeddedEngine engine; private BlockingQueue consumedLines; - protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(60); + protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(120); protected final Logger logger = LoggerFactory.getLogger(getClass()); private CountDownLatch latch; private JsonConverter keyJsonConverter = new JsonConverter();