DBZ-20 A subset of string, time and number datatypes

This commit is contained in:
Jiri Pechanec 2018-03-27 13:17:14 +02:00
parent 603f02e70d
commit 10ddb69dea
12 changed files with 652 additions and 27 deletions

View File

@ -80,7 +80,7 @@ public void start(Configuration config) {
jdbcConnection = new OracleConnection(jdbcConfig, new OracleConnectionFactory());
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector);
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig);
if (previousOffset != null) {

View File

@ -40,12 +40,12 @@ public class OracleDatabaseSchema implements RelationalDatabaseSchema {
private final TableSchemaBuilder tableSchemaBuilder;
private final DatabaseHistory databaseHistory;
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector) {
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, OracleConnection connection) {
this.topicSelector = topicSelector;
this.tables = new Tables();
this.schemas = new HashMap<>();
this.tableSchemaBuilder = new TableSchemaBuilder(new OracleValueConverters(), schemaNameAdjuster, SourceInfo.SCHEMA);
this.tableSchemaBuilder = new TableSchemaBuilder(new OracleValueConverters(connection), schemaNameAdjuster, SourceInfo.SCHEMA);
this.databaseHistory = connectorConfig.getDatabaseHistory();
this.databaseHistory.start();

View File

@ -33,6 +33,7 @@
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlParser;
import oracle.jdbc.OracleTypes;
public class OracleDdlParser implements DdlParser {
@ -127,29 +128,76 @@ public void exitColumn_definition(Column_definitionContext ctx) {
ColumnEditor columnEditor = Column.editor();
columnEditor.name(getColumnName(ctx.column_name()));
if (ctx.datatype().native_datatype_element().INT() != null || ctx.datatype().native_datatype_element().INTEGER() != null) {
if (ctx.datatype().native_datatype_element().INT() != null
|| ctx.datatype().native_datatype_element().INTEGER() != null
|| ctx.datatype().native_datatype_element().SMALLINT() != null) {
columnEditor.jdbcType(Types.NUMERIC);
columnEditor.type("NUMBER");
columnEditor.length(38);
columnEditor.scale(0);
}
else if (ctx.datatype().native_datatype_element().DATE() != null) {
columnEditor.jdbcType(Types.DATE);
// JDBC driver reports type as timestamp but name DATE
columnEditor.jdbcType(Types.TIMESTAMP);
columnEditor.type("DATE");
}
else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) {
columnEditor.jdbcType(Types.TIMESTAMP);
columnEditor.type("TIMESTAMP");
if (ctx.datatype().WITH() != null
&& ctx.datatype().TIME() != null
&& ctx.datatype().ZONE() != null) {
if (ctx.datatype().LOCAL() != null) {
columnEditor.jdbcType(OracleTypes.TIMESTAMPLTZ);
columnEditor.type("TIMESTAMP WITH LOCAL TIME ZONE");
}
else {
columnEditor.jdbcType(OracleTypes.TIMESTAMPTZ);
columnEditor.type("TIMESTAMP WITH TIME ZONE");
}
}
else {
columnEditor.jdbcType(Types.TIMESTAMP);
columnEditor.type("TIMESTAMP");
}
columnEditor.length(6);
columnEditor.scale(0);
}
else if (ctx.datatype().native_datatype_element().VARCHAR2() != null) {
columnEditor.jdbcType(Types.VARCHAR);
columnEditor.type("VARCHAR2");
columnEditor.length(getVarCharDefaultLength());
}
else if (ctx.datatype().native_datatype_element().DECIMAL() != null) {
columnEditor.jdbcType(Types.DECIMAL);
columnEditor.type("DECIMAL");
else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) {
columnEditor.jdbcType(Types.NVARCHAR);
columnEditor.type("NVARCHAR2");
columnEditor.length(getVarCharDefaultLength());
}
else if (ctx.datatype().native_datatype_element().CHAR() != null) {
columnEditor.jdbcType(Types.CHAR);
columnEditor.type("CHAR");
columnEditor.length(1);
}
else if (ctx.datatype().native_datatype_element().NCHAR() != null) {
columnEditor.jdbcType(Types.NCHAR);
columnEditor.type("NCHAR");
columnEditor.length(1);
}
else if (ctx.datatype().native_datatype_element().BINARY_FLOAT() != null) {
columnEditor.jdbcType(OracleTypes.BINARY_FLOAT);
columnEditor.type("BINARY_FLOAT");
}
else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) {
columnEditor.jdbcType(OracleTypes.BINARY_DOUBLE);
columnEditor.type("BINARY_DOUBLE");
}
else if (ctx.datatype().native_datatype_element().FLOAT() != null) {
columnEditor.jdbcType(Types.FLOAT);
columnEditor.type("FLOAT");
columnEditor.length(126);
}
else if (ctx.datatype().native_datatype_element().NUMERIC() != null
|| ctx.datatype().native_datatype_element().NUMBER() != null
|| ctx.datatype().native_datatype_element().DECIMAL() != null) {
columnEditor.jdbcType(Types.NUMERIC);
columnEditor.type("NUMBER");
}
else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText());
@ -176,6 +224,11 @@ else if (ctx.datatype().native_datatype_element().DECIMAL() != null) {
super.exitColumn_definition(ctx);
}
private int getVarCharDefaultLength() {
// TODO replace with falue from select name, value from v$parameter where name='max_string_size';
return 4000;
}
@Override
public void exitOut_of_line_constraint(Out_of_line_constraintContext ctx) {
if(ctx.PRIMARY() != null) {

View File

@ -64,7 +64,7 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException
private SchemaChangeEventType getSchemaChangeEventType() {
switch(ddlLcr.getCommandType()) {
case "CREATE TABLE": return SchemaChangeEventType.CREATE;
case "ALTER TABLE": throw new UnsupportedOperationException("ALTER TABLE not yet implemented");
// case "ALTER TABLE": throw new UnsupportedOperationException("ALTER TABLE not yet implemented");
case "DROP TABLE": throw new UnsupportedOperationException("DROP TABLE not yet implemented");
default:
LOGGER.debug("Ignoring DDL event of type {}", ddlLcr.getCommandType());

View File

@ -5,29 +5,90 @@
*/
package io.debezium.connector.oracle;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.sql.Types;
import java.time.ZonedDateTime;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.ZonedTimestamp;
import oracle.jdbc.OracleTypes;
import oracle.sql.BINARY_DOUBLE;
import oracle.sql.BINARY_FLOAT;
import oracle.sql.CHAR;
import oracle.sql.DATE;
import oracle.sql.NUMBER;
import oracle.sql.TIMESTAMP;
import oracle.sql.TIMESTAMPLTZ;
import oracle.sql.TIMESTAMPTZ;
public class OracleValueConverters extends JdbcValueConverters {
private final OracleConnection connection;
public OracleValueConverters(OracleConnection connection) {
this.connection = connection;
}
@Override
public SchemaBuilder schemaBuilder(Column column) {
logger.debug("Building schema for column {} of type {} named {}", column.name(), column.jdbcType(), column.typeName());
switch (column.jdbcType()) {
// Oracle's float is not float as in Java but a NUMERIC without scale
case Types.FLOAT:
return VariableScaleDecimal.builder();
case OracleTypes.BINARY_FLOAT:
return SchemaBuilder.float32();
case OracleTypes.BINARY_DOUBLE:
return SchemaBuilder.float64();
case OracleTypes.TIMESTAMPTZ:
case OracleTypes.TIMESTAMPLTZ:
return ZonedTimestamp.builder();
default:
return super.schemaBuilder(column);
}
}
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
switch(column.jdbcType()) {
case Types.CHAR:
case Types.VARCHAR:
case Types.NCHAR:
case Types.NVARCHAR:
return data -> convertString(column, fieldDefn, data);
case OracleTypes.BINARY_FLOAT:
return data -> convertFloat(column, fieldDefn, data);
case OracleTypes.BINARY_DOUBLE:
return data -> convertDouble(column, fieldDefn, data);
case Types.NUMERIC:
return data -> convertNumeric(column, fieldDefn, data);
case Types.FLOAT:
return data -> convertOracleFloat(column, fieldDefn, data);
case OracleTypes.TIMESTAMPTZ:
case OracleTypes.TIMESTAMPLTZ:
return (data) -> convertTimestampWithZone(column, fieldDefn, data);
}
return super.converter(column, fieldDefn);
}
@Override
protected Object convertString(Column column, Field fieldDefn, Object data) {
if (data instanceof CHAR) {
return ((CHAR)data).stringValue();
}
return super.convertString(column, fieldDefn, data);
}
@Override
protected Object convertInteger(Column column, Field fieldDefn, Object data) {
if (data instanceof NUMBER) {
@ -42,6 +103,37 @@ protected Object convertInteger(Column column, Field fieldDefn, Object data) {
return super.convertInteger(column, fieldDefn, data);
}
@Override
protected Object convertFloat(Column column, Field fieldDefn, Object data) {
if (data instanceof NUMBER) {
return ((NUMBER)data).floatValue();
}
else if (data instanceof BINARY_FLOAT) {
try {
return ((BINARY_FLOAT)data).floatValue();
}
catch (SQLException e) {
throw new RuntimeException("Couldn't convert value for column " + column.name(), e);
}
}
return super.convertFloat(column, fieldDefn, data);
}
@Override
protected Object convertDouble(Column column, Field fieldDefn, Object data) {
if (data instanceof BINARY_DOUBLE) {
try {
return ((BINARY_DOUBLE)data).doubleValue();
}
catch (SQLException e) {
throw new RuntimeException("Couldn't convert value for column " + column.name(), e);
}
}
return super.convertDouble(column, fieldDefn, data);
}
@Override
protected Object convertDecimal(Column column, Field fieldDefn, Object data) {
if (data instanceof NUMBER) {
@ -70,17 +162,64 @@ protected Object convertNumeric(Column column, Field fieldDefn, Object data) {
return super.convertNumeric(column, fieldDefn, data);
}
@Override
protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object data) {
if (data instanceof TIMESTAMP) {
try {
data = ((TIMESTAMP)data).timestampValue();
protected Object convertOracleFloat(Column column, Field fieldDefn, Object data) {
data = convertNumeric(column, fieldDefn, data);
if (data == null) {
return null;
}
// TODO Need to handle special values, it is not supported in variable scale decimal
else if (data instanceof SpecialValueDecimal) {
return VariableScaleDecimal.fromLogical(fieldDefn.schema(), (SpecialValueDecimal)data);
}
else if (data instanceof BigDecimal) {
return VariableScaleDecimal.fromLogical(fieldDefn.schema(), new SpecialValueDecimal((BigDecimal)data));
}
return handleUnknownData(column, fieldDefn, data);
}
protected Object fromOracleTimeClasses(Column column, Object data) {
try {
if (data instanceof TIMESTAMP) {
data = ((TIMESTAMP) data).timestampValue();
}
catch (SQLException e) {
throw new RuntimeException("Couldn't convert value for column " + column.name(), e);
else if (data instanceof DATE) {
data = ((DATE) data).timestampValue();
}
else if (data instanceof TIMESTAMPTZ) {
final TIMESTAMPTZ ts = (TIMESTAMPTZ)data;
data = ZonedDateTime.ofInstant(ts.timestampValue(connection.connection()).toInstant(), ts.getTimeZone().toZoneId());
}
else if (data instanceof TIMESTAMPLTZ) {
// JDBC driver throws an exception
// final TIMESTAMPLTZ ts = (TIMESTAMPLTZ)data;
// data = ts.offsetDateTimeValue(connection.connection());
return null;
}
}
catch (SQLException e) {
throw new RuntimeException("Couldn't convert value for column " + column.name(), e);
}
return data;
}
return super.convertTimestampToEpochMicros(column, fieldDefn, data);
@Override
protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object data) {
return super.convertTimestampToEpochMicros(column, fieldDefn, fromOracleTimeClasses(column, data));
}
@Override
protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object data) {
return super.convertTimestampToEpochMillis(column, fieldDefn, fromOracleTimeClasses(column, data));
}
@Override
protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object data) {
return super.convertTimestampToEpochNanos(column, fieldDefn, fromOracleTimeClasses(column, data));
}
@Override
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
return super.convertTimestampWithZone(column, fieldDefn, fromOracleTimeClasses(column, data));
}
}

View File

@ -0,0 +1,239 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import static org.fest.assertions.Assertions.assertThat;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
/**
* Integration test to verify different Oracle datatypes.
*
* @author Jiri Pechanec
*/
public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest {
private static final Schema INTEGER_SCHEMA = Decimal.builder(0).optional().schema();
private static final String DDL_STRING = "create table debezium.type_string (" +
" id int not null, " +
" val_varchar2 varchar2(1000), " +
" val_nvarchar2 nvarchar2(1000), " +
" val_char char(3), " +
" val_nchar nchar(3), " +
// " val_character_varying character varying(1000), " +
// " val_national_char national char(4), " +
" primary key (id)" +
")";
private static final String DDL_FP = "create table debezium.type_fp (" +
" id int not null, " +
" val_bf binary_float, " +
" val_bd binary_double, " +
" val_f float, " +
" val_num number(10,6), " +
" primary key (id)" +
")";
private static final String DDL_INT = "create table debezium.type_int (" +
" id int not null, " +
" val_int int, " +
" val_integer integer, " +
" val_smallint smallint, " +
" primary key (id)" +
")";
private static final String DDL_TIME = "create table debezium.type_time (" +
" id int not null, " +
" val_date date, " +
" val_ts timestamp, " +
" val_tstz timestamp with time zone, " +
" val_tsltz timestamp with local time zone, " +
" primary key (id)" +
")";
private static final List<SchemaAndValueField> EXPECTED_STRING = Arrays.asList(
new SchemaAndValueField("VAL_VARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"),
new SchemaAndValueField("VAL_NVARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "nv\u010d2"),
new SchemaAndValueField("VAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "c "),
new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d ")
// new SchemaAndValueField("VAL_CHARACTER_VARYING", Schema.OPTIONAL_STRING_SCHEMA, "av\u010d2"),
// new SchemaAndValueField("VAL_NATIONAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "an\u010d ")
);
private static final List<SchemaAndValueField> EXPECTED_FP = Arrays.asList(
new SchemaAndValueField("VAL_BF", Schema.OPTIONAL_FLOAT32_SCHEMA, 1.1f),
new SchemaAndValueField("VAL_BD", Schema.OPTIONAL_FLOAT64_SCHEMA, 2.22),
new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().schema(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().schema(), new SpecialValueDecimal(new BigDecimal("3.33")))),
new SchemaAndValueField("VAL_NUM", Decimal.builder(6).optional().schema(), new BigDecimal("4.4444"))
);
private static final List<SchemaAndValueField> EXPECTED_INT = Arrays.asList(
new SchemaAndValueField("VAL_INT", INTEGER_SCHEMA, new BigDecimal("1")),
new SchemaAndValueField("VAL_INTEGER", INTEGER_SCHEMA, new BigDecimal("22")),
new SchemaAndValueField("VAL_SMALLINT", INTEGER_SCHEMA, new BigDecimal("333"))
);
private static final List<SchemaAndValueField> EXPECTED_TIME = Arrays.asList(
new SchemaAndValueField("VAL_DATE", Timestamp.builder().optional().build(), 1522108800_000l),
new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890),
new SchemaAndValueField("VAL_TSTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00")
// new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00")
);
private static final String[] ALL_TABLES = {
"debezium.type_string",
"debezium.type_fp",
"debezium.type_int",
"debezium.type_time"
};
private static final String[] ALL_DDLS = {
DDL_STRING,
DDL_FP,
DDL_INT,
DDL_TIME
};
private static OracleConnection connection;
@BeforeClass
public static void dropTables() throws SQLException {
connection = TestHelper.testConnection();
for (String table: ALL_TABLES) {
TestHelper.dropTable(connection, table);
}
}
protected static void createTables() throws SQLException {
connection.execute(ALL_DDLS);
for (String table: ALL_TABLES) {
streamTable(table);
}
}
private static void streamTable(String table) throws SQLException {
connection.execute("GRANT SELECT ON " + table + " to c##xstrmadmin");
connection.execute("ALTER TABLE " + table + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
}
@AfterClass
public static void closeConnection() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Test
public void stringTypes() throws Exception {
int expectedRecordCount = 0;
// connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d', 'av\u010d2', 'an\u010d')");
connection.execute("INSERT INTO debezium.type_string VALUES (1, 'v\u010d2', 'nv\u010d2', 'c', 'n\u010d')");
connection.execute("COMMIT");
Testing.debug("Inserted");
expectedRecordCount++;
final SourceRecords records = consumeRecordsByTopic(expectedRecordCount);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_STRING");
assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_STRING);
}
@Test
public void fpTypes() throws Exception {
int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_fp VALUES (1, 1.1, 2.22, 3.33, 4.4444)");
connection.execute("COMMIT");
Testing.debug("Inserted");
expectedRecordCount++;
final SourceRecords records = consumeRecordsByTopic(expectedRecordCount);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_FP");
assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_FP);
}
@Test
public void intTypes() throws Exception {
int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_int VALUES (1, 1, 22, 333)");
connection.execute("COMMIT");
Testing.debug("Inserted");
expectedRecordCount++;
final SourceRecords records = consumeRecordsByTopic(expectedRecordCount);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_INT");
assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_INT);
}
@Test
public void timeTypes() throws Exception {
int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.type_time VALUES (1, '27-MAR-2018', '27-MAR-2018 12:34:56.00789', '27-MAR-2018 01:34:56.00789 am -11:00', '27-MAR-2018 01:34:56.00789')");
connection.execute("COMMIT");
Testing.debug("Inserted");
expectedRecordCount++;
final SourceRecords records = consumeRecordsByTopic(expectedRecordCount);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.ORCLPDB1.DEBEZIUM.TYPE_TIME");
assertThat(testTableRecords).hasSize(expectedRecordCount);
// insert
VerifyRecord.isValidInsert(testTableRecords.get(0));
Struct after = (Struct) ((Struct)testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_TIME);
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
}

View File

@ -50,6 +50,19 @@ public void shouldParseCreateTable() {
assertThat(id.jdbcType()).isEqualTo(Types.NUMERIC);
assertThat(id.typeName()).isEqualTo("NUMBER");
final Column name = table.columnWithName("NAME");
assertThat(name.isOptional()).isTrue();
assertThat(name.jdbcType()).isEqualTo(Types.VARCHAR);
assertThat(name.typeName()).isEqualTo("VARCHAR2");
assertThat(name.length()).isEqualTo(1000);
final Column score = table.columnWithName("SCORE");
assertThat(score.isOptional()).isTrue();
assertThat(score.jdbcType()).isEqualTo(Types.DECIMAL);
assertThat(score.typeName()).isEqualTo("DECIMAL");
assertThat(score.length()).isEqualTo(6);
assertThat(score.scale()).isEqualTo(2);
assertThat(table.columns()).hasSize(4);
assertThat(table.isPrimaryKeyColumn("ID"));
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.sql.SQLException;
import org.junit.Before;
import org.junit.BeforeClass;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.util.Testing;
/**
* Integration test to verify different Oracle datatypes.
*
* @author Jiri Pechanec
*/
public class SnapshotDatatypesIT extends AbstractOracleDatatypesTest {
@BeforeClass
public static void beforeClass() throws SQLException {
createTables();
}
@Before
public void before() throws Exception {
initializeConnectorTestFramework();
Testing.Debug.enable();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Configuration config = TestHelper.defaultConfig().build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
Thread.sleep(1000);
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import org.junit.Before;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.util.Testing;
/**
* Integration test to verify different Oracle datatypes.
*
* @author Jiri Pechanec
*/
public class StreamingDatatypesIT extends AbstractOracleDatatypesTest {
@Before
public void before() throws Exception {
dropTables();
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Configuration config = TestHelper.defaultConfig().build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
Thread.sleep(1000);
createTables();
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.data;
import java.util.List;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.Assertions;
public class SchemaAndValueField {
private final Object schema;
private final Object value;
private final String fieldName;
private Supplier<Boolean> assertValueOnlyIf = null;
public SchemaAndValueField(String fieldName, Object schema, Object value) {
this.schema = schema;
this.value = value;
this.fieldName = fieldName;
}
public SchemaAndValueField assertValueOnlyIf(final Supplier<Boolean> predicate) {
assertValueOnlyIf = predicate;
return this;
}
public void assertFor(Struct content) {
assertSchema(content);
assertValue(content);
}
private void assertValue(Struct content) {
if (assertValueOnlyIf != null && !assertValueOnlyIf.get()) {
return;
}
if (value == null) {
Assertions.assertThat(content.get(fieldName)).as(fieldName + " is present in the actual content").isNull();
return;
}
Object actualValue = content.get(fieldName);
Assertions.assertThat(actualValue).as(fieldName + " is not present in the actual content").isNotNull();
// assert the value type; for List all implementation types (e.g. immutable ones) are acceptable
if(actualValue instanceof List) {
Assertions.assertThat(value).as("Incorrect value type for " + fieldName).isInstanceOf(List.class);
final List<?> actualValueList = (List<?>)actualValue;
final List<?> valueList = (List<?>)value;
Assertions.assertThat(actualValueList).as("List size don't match for " + fieldName).hasSize(valueList.size());
if (!valueList.isEmpty() && valueList.iterator().next() instanceof Struct) {
for (int i = 0; i < valueList.size(); i++) {
assertStruct((Struct)valueList.get(i), (Struct)actualValueList.get(i));
}
return;
}
}
else {
Assertions.assertThat(actualValue.getClass()).as("Incorrect value type for " + fieldName).isEqualTo(value.getClass());
}
if (actualValue instanceof byte[]) {
Assertions.assertThat((byte[]) actualValue).as("Values don't match for " + fieldName).isEqualTo((byte[]) value);
} else if (actualValue instanceof Struct) {
assertStruct((Struct)value, (Struct)actualValue);
} else {
Assertions.assertThat(actualValue).as("Values don't match for " + fieldName).isEqualTo(value);
}
}
private void assertStruct(final Struct expectedStruct, final Struct actualStruct) {
expectedStruct.schema().fields().stream().forEach(field -> {
final Object expectedValue = expectedStruct.get(field);
if (expectedValue == null) {
Assertions.assertThat(actualStruct.get(field.name())).as(fieldName + " is present in the actual content").isNull();
return;
}
final Object actualValue = actualStruct.get(field.name());
Assertions.assertThat(actualValue).as("No value found for " + fieldName).isNotNull();
Assertions.assertThat(actualValue.getClass()).as("Incorrect value type for " + fieldName).isEqualTo(expectedValue.getClass());
if (actualValue instanceof byte[]) {
Assertions.assertThat(expectedValue).as("Array is not expected for " + fieldName).isInstanceOf(byte[].class);
Assertions.assertThat((byte[]) actualValue).as("Values don't match for " + fieldName).isEqualTo((byte[]) expectedValue);
}
else if (actualValue instanceof Struct) {
assertStruct((Struct)expectedValue, (Struct)actualValue);
}else {
Assertions.assertThat(actualValue).as("Values don't match for " + fieldName).isEqualTo(expectedValue);
}
});
}
private void assertSchema(Struct content) {
if (schema == null) {
return;
}
Schema schema = content.schema();
Field field = schema.field(fieldName);
Assertions.assertThat(field).as(fieldName + " not found in schema " + schema).isNotNull();
Assertions.assertThat(field.schema()).as("Schema for " + field.name() + " does not match the actual value").isEqualTo(this.schema);
}
}

View File

@ -82,7 +82,7 @@ public abstract class AbstractConnectorTest implements Testing {
private ExecutorService executor;
protected EmbeddedEngine engine;
private BlockingQueue<SourceRecord> consumedLines;
protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(60);
protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(120);
protected final Logger logger = LoggerFactory.getLogger(getClass());
private CountDownLatch latch;
private JsonConverter keyJsonConverter = new JsonConverter();