DBZ-3710 Support Oracle column default values
This commit is contained in:
parent
3fa9e73348
commit
343318b5d4
@ -11,7 +11,6 @@
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.sql.Types;
|
||||
import java.time.Duration;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.ArrayList;
|
||||
@ -35,6 +34,7 @@
|
||||
import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ColumnEditor;
|
||||
import io.debezium.relational.TableEditor;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
@ -261,11 +261,6 @@ public void readSchemaForCapturedTables(Tables tables, String databaseCatalog, S
|
||||
tableIdsBefore.removeAll(columnsByTable.keySet());
|
||||
tableIdsBefore.forEach(tables::removeTable);
|
||||
}
|
||||
|
||||
for (TableId tableId : capturedTables) {
|
||||
overrideOracleSpecificColumnTypes(tables, tableId, tableId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -283,7 +278,9 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
|
||||
TableId tableIdWithCatalog = new TableId(databaseCatalog, tableId.schema(), tableId.table());
|
||||
|
||||
if (tableFilter.isIncluded(tableIdWithCatalog)) {
|
||||
overrideOracleSpecificColumnTypes(tables, tableId, tableIdWithCatalog);
|
||||
TableEditor editor = tables.editTable(tableId);
|
||||
editor.tableId(tableIdWithCatalog);
|
||||
tables.overwriteTable(editor.create());
|
||||
}
|
||||
|
||||
tables.removeTable(tableId);
|
||||
@ -303,36 +300,6 @@ protected boolean isTableUniqueIndexIncluded(String indexName, String columnName
|
||||
return false;
|
||||
}
|
||||
|
||||
private void overrideOracleSpecificColumnTypes(Tables tables, TableId tableId, TableId tableIdWithCatalog) {
|
||||
TableEditor editor = tables.editTable(tableId);
|
||||
editor.tableId(tableIdWithCatalog);
|
||||
|
||||
List<String> columnNames = new ArrayList<>(editor.columnNames());
|
||||
for (String columnName : columnNames) {
|
||||
Column column = editor.columnWithName(columnName);
|
||||
if (column.jdbcType() == Types.TIMESTAMP) {
|
||||
editor.addColumn(
|
||||
column.edit()
|
||||
.length(column.scale().orElse(Column.UNSET_INT_VALUE))
|
||||
.scale(null)
|
||||
.create());
|
||||
}
|
||||
// NUMBER columns without scale value have it set to -127 instead of null;
|
||||
// let's rectify that
|
||||
else if (column.jdbcType() == OracleTypes.NUMBER) {
|
||||
column.scale()
|
||||
.filter(s -> s == ORACLE_UNSET_SCALE)
|
||||
.ifPresent(s -> {
|
||||
editor.addColumn(
|
||||
column.edit()
|
||||
.scale(null)
|
||||
.create());
|
||||
});
|
||||
}
|
||||
}
|
||||
tables.overwriteTable(editor.create());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current, most recent system change number.
|
||||
*
|
||||
@ -571,4 +538,17 @@ public Optional<OffsetDateTime> getScnToTimestamp(Scn scn) throws SQLException {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ColumnEditor overrideColumn(ColumnEditor column) {
|
||||
// This allows the column state to be overridden before default-value resolution so that the
|
||||
// output of the default value is within the same precision as that of the column values.
|
||||
if (OracleTypes.TIMESTAMP == column.jdbcType()) {
|
||||
column.length(column.scale().orElse(Column.UNSET_INT_VALUE)).scale(null);
|
||||
}
|
||||
else if (OracleTypes.NUMBER == column.jdbcType()) {
|
||||
column.scale().filter(s -> s == ORACLE_UNSET_SCALE).ifPresent(s -> column.scale(null));
|
||||
}
|
||||
return column;
|
||||
}
|
||||
}
|
||||
|
@ -57,8 +57,10 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
|
||||
validateRedoLogConfiguration();
|
||||
|
||||
OracleValueConverters valueConverters = new OracleValueConverters(connectorConfig, jdbcConnection);
|
||||
OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(valueConverters, jdbcConnection);
|
||||
TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection);
|
||||
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity);
|
||||
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, defaultValueConverter, schemaNameAdjuster,
|
||||
topicSelector, tableNameCaseSensitivity);
|
||||
|
||||
Offsets<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig),
|
||||
connectorConfig.getAdapter().getOffsetContextLoader());
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
|
||||
import io.debezium.connector.oracle.antlr.OracleDdlParser;
|
||||
import io.debezium.relational.DefaultValueConverter;
|
||||
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -29,16 +30,16 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class);
|
||||
|
||||
private final OracleDdlParser ddlParser;
|
||||
private final OracleValueConverters valueConverters;
|
||||
private boolean storageInitializationExecuted = false;
|
||||
|
||||
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters,
|
||||
SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector,
|
||||
TableNameCaseSensitivity tableNameCaseSensitivity) {
|
||||
DefaultValueConverter defaultValueConverter, SchemaNameAdjuster schemaNameAdjuster,
|
||||
TopicSelector<TableId> topicSelector, TableNameCaseSensitivity tableNameCaseSensitivity) {
|
||||
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(),
|
||||
connectorConfig.getColumnFilter(),
|
||||
new TableSchemaBuilder(
|
||||
valueConverters,
|
||||
defaultValueConverter,
|
||||
schemaNameAdjuster,
|
||||
connectorConfig.customConverterRegistry(),
|
||||
connectorConfig.getSourceInfoStructMaker().schema(),
|
||||
@ -53,17 +54,12 @@ public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueCo
|
||||
connectorConfig.isSchemaCommentsHistoryEnabled(),
|
||||
valueConverters,
|
||||
connectorConfig.getTableFilters().dataCollectionFilter());
|
||||
this.valueConverters = valueConverters;
|
||||
}
|
||||
|
||||
public Tables getTables() {
|
||||
return tables();
|
||||
}
|
||||
|
||||
public OracleValueConverters getValueConverters() {
|
||||
return valueConverters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OracleDdlParser getDdlParser() {
|
||||
return ddlParser;
|
||||
|
@ -0,0 +1,243 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.oracle;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.DefaultValueConverter;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
import oracle.jdbc.OracleTypes;
|
||||
import oracle.sql.INTERVALDS;
|
||||
import oracle.sql.INTERVALYM;
|
||||
import oracle.sql.TIMESTAMP;
|
||||
import oracle.sql.TIMESTAMPLTZ;
|
||||
import oracle.sql.TIMESTAMPTZ;
|
||||
|
||||
/**
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
@ThreadSafe
|
||||
public class OracleDefaultValueConverter implements DefaultValueConverter {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(OracleDefaultValueConverter.class);
|
||||
|
||||
/**
|
||||
* Converts JDBC string representation of a default column value to an object.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
private interface DefaultValueMapper {
|
||||
/**
|
||||
* Parses string to an object.
|
||||
*
|
||||
* @param column the column, never {@code null}
|
||||
* @param value string representation
|
||||
* @return the parsed value
|
||||
* @throws Exception if there is a parsing error
|
||||
*/
|
||||
Object parse(Column column, String value) throws Exception;
|
||||
}
|
||||
|
||||
private final OracleValueConverters valueConverters;
|
||||
private final OracleConnection jdbcConnection;
|
||||
private final Map<Integer, DefaultValueMapper> defaultValueMappers;
|
||||
|
||||
public OracleDefaultValueConverter(OracleValueConverters valueConverters, OracleConnection jdbcConnection) {
|
||||
this.valueConverters = valueConverters;
|
||||
this.jdbcConnection = jdbcConnection;
|
||||
this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Object> parseDefaultValue(Column column, String defaultValue) {
|
||||
final int dataType = column.jdbcType();
|
||||
final DefaultValueMapper mapper = defaultValueMappers.get(dataType);
|
||||
if (mapper == null) {
|
||||
LOGGER.warn("Mapper for type '{}' not found.", dataType);
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
Object rawDefaultValue = mapper.parse(column, defaultValue);
|
||||
Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column);
|
||||
if (convertedDefaultValue instanceof Struct) {
|
||||
// Workaround for KAFKA-12694
|
||||
LOGGER.warn("Struct can't be used as default value for column '{}', will use null instead.", column.name());
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.ofNullable(convertedDefaultValue);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.warn("Cannot parse column default value '{}' to type '{}'. Expression evaluation is not supported.", defaultValue, dataType);
|
||||
LOGGER.debug("Parsing failed due to error", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private Object convertDefaultValue(Object defaultValue, Column column) {
|
||||
// if converters is not null and the default value is not null, we need to convert default value
|
||||
if (valueConverters != null && defaultValue != null) {
|
||||
final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column);
|
||||
if (schemaBuilder != null) {
|
||||
final Schema schema = schemaBuilder.build();
|
||||
// In order to get the valueConverter for this column, we have to create a field;
|
||||
// The index value -1 in the field will never used when converting default value;
|
||||
// So we can set any number here;
|
||||
final Field field = new Field(column.name(), -1, schema);
|
||||
final ValueConverter valueConverter = valueConverters.converter(column, field);
|
||||
|
||||
Object result = valueConverter.convert(defaultValue);
|
||||
if ((result instanceof BigDecimal) && column.scale().isPresent() && column.scale().get() > ((BigDecimal) result).scale()) {
|
||||
// Note as the scale is increased only, the rounding is more cosmetic.
|
||||
result = ((BigDecimal) result).setScale(column.scale().get(), RoundingMode.HALF_EVEN);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
private Map<Integer, DefaultValueMapper> createDefaultValueMappers() {
|
||||
// Data types that are supported should be registered in the map. Many of the data types
|
||||
// have String-based conversions defined in OracleValueConverters since LogMiner provides
|
||||
// column values as strings. The only special handling that is needed here is if a type
|
||||
// is formatted with unique characteristics such as single/double quotes for strings.
|
||||
//
|
||||
// Additionally, we use the OracleTypes numeric representation for data types rather than
|
||||
// the type name like we do for SQL Server since the type names can include precision
|
||||
// and scale, i.e. TIMESTAMP(6) or INTERVAL YEAR(2) TO MONTH.
|
||||
final Map<Integer, DefaultValueMapper> result = new HashMap<>();
|
||||
|
||||
// Numeric types
|
||||
result.put(OracleTypes.NUMERIC, nullableDefaultValueMapper());
|
||||
|
||||
// Approximate numerics
|
||||
result.put(OracleTypes.BINARY_FLOAT, nullableDefaultValueMapper());
|
||||
result.put(OracleTypes.BINARY_DOUBLE, nullableDefaultValueMapper());
|
||||
result.put(OracleTypes.FLOAT, nullableDefaultValueMapper());
|
||||
|
||||
// Date and time
|
||||
result.put(OracleTypes.DATE, nullableDefaultValueMapper(castTemporalFunctionCall()));
|
||||
result.put(OracleTypes.TIME, nullableDefaultValueMapper(castTemporalFunctionCall()));
|
||||
result.put(OracleTypes.TIMESTAMP, nullableDefaultValueMapper(castTemporalFunctionCall()));
|
||||
result.put(OracleTypes.TIMESTAMPTZ, nullableDefaultValueMapper(castTemporalFunctionCall()));
|
||||
result.put(OracleTypes.TIMESTAMPLTZ, nullableDefaultValueMapper(castTemporalFunctionCall()));
|
||||
result.put(OracleTypes.INTERVALYM, nullableDefaultValueMapper(convertIntervalYearMonthStringLiteral()));
|
||||
result.put(OracleTypes.INTERVALDS, nullableDefaultValueMapper(convertIntervalDaySecondStringLiteral()));
|
||||
|
||||
// Character strings
|
||||
result.put(OracleTypes.CHAR, nullableDefaultValueMapper(enforceCharFieldPadding()));
|
||||
result.put(OracleTypes.VARCHAR, nullableDefaultValueMapper(enforceStringUnquote()));
|
||||
|
||||
// Unicode character strings
|
||||
result.put(OracleTypes.NCHAR, nullableDefaultValueMapper(enforceCharFieldPadding()));
|
||||
result.put(OracleTypes.NVARCHAR, nullableDefaultValueMapper(enforceStringUnquote()));
|
||||
|
||||
// Other data types have been omitted.
|
||||
return result;
|
||||
}
|
||||
|
||||
private DefaultValueMapper nullableDefaultValueMapper() {
|
||||
return nullableDefaultValueMapper(null);
|
||||
}
|
||||
|
||||
private DefaultValueMapper nullableDefaultValueMapper(DefaultValueMapper mapper) {
|
||||
return (column, value) -> {
|
||||
if ("NULL".equalsIgnoreCase(value)) {
|
||||
return null;
|
||||
}
|
||||
if (mapper != null) {
|
||||
return mapper.parse(column, value);
|
||||
}
|
||||
return value;
|
||||
};
|
||||
}
|
||||
|
||||
private DefaultValueMapper convertIntervalDaySecondStringLiteral() {
|
||||
return (column, value) -> {
|
||||
if (value != null && value.length() > 2 && value.startsWith("'") && value.endsWith("'")) {
|
||||
// When supplied as a string literal, pass to value converter as the Oracle type
|
||||
return new INTERVALDS(value.substring(1, value.length() - 1));
|
||||
}
|
||||
return value;
|
||||
};
|
||||
}
|
||||
|
||||
private DefaultValueMapper convertIntervalYearMonthStringLiteral() {
|
||||
return (column, value) -> {
|
||||
if (value != null && value.length() > 2 && value.startsWith("'") && value.endsWith("'")) {
|
||||
// When supplied as a string literal, pass to value converter as the Oracle type
|
||||
return new INTERVALYM(value.substring(1, value.length() - 1));
|
||||
}
|
||||
return value;
|
||||
};
|
||||
}
|
||||
|
||||
private DefaultValueMapper enforceCharFieldPadding() {
|
||||
return (column, value) -> value != null ? Strings.pad(unquote(value), column.length(), ' ') : null;
|
||||
}
|
||||
|
||||
private DefaultValueMapper enforceStringUnquote() {
|
||||
return (column, value) -> value != null ? unquote(value) : null;
|
||||
}
|
||||
|
||||
private DefaultValueMapper castTemporalFunctionCall() {
|
||||
return (column, value) -> {
|
||||
if ("SYSDATE".equalsIgnoreCase(value.trim())) {
|
||||
return column.isOptional() ? null : "0";
|
||||
}
|
||||
else if (value.toUpperCase().startsWith("TO_TIMESTAMP")) {
|
||||
switch (column.jdbcType()) {
|
||||
case OracleTypes.TIME:
|
||||
case OracleTypes.TIMESTAMP:
|
||||
return JdbcConnection.querySingleValue(jdbcConnection.connection(),
|
||||
"SELECT CAST(" + value + " AS TIMESTAMP) FROM DUAL",
|
||||
st -> {
|
||||
}, rs -> rs.getObject(1, TIMESTAMP.class));
|
||||
case OracleTypes.TIMESTAMPTZ:
|
||||
return JdbcConnection.querySingleValue(jdbcConnection.connection(),
|
||||
"SELECT CAST(" + value + " AS TIMESTAMP WITH TIME ZONE) FROM DUAL",
|
||||
st -> {
|
||||
}, rs -> rs.getObject(1, TIMESTAMPTZ.class));
|
||||
case OracleTypes.TIMESTAMPLTZ:
|
||||
return JdbcConnection.querySingleValue(jdbcConnection.connection(),
|
||||
"SELECT CAST(" + value + " AS TIMESTAMP WITH LOCAL TIME ZONE) FROM DUAL",
|
||||
st -> {
|
||||
}, rs -> rs.getObject(1, TIMESTAMPLTZ.class));
|
||||
}
|
||||
}
|
||||
else if (value.toUpperCase().startsWith("TO_DATE")) {
|
||||
if (column.jdbcType() == OracleTypes.DATE || column.jdbcType() == OracleTypes.TIMESTAMP) {
|
||||
return JdbcConnection.querySingleValue(jdbcConnection.connection(),
|
||||
"SELECT CAST(" + value + " AS TIMESTAMP) FROM DUAL",
|
||||
st -> {
|
||||
}, rs -> rs.getObject(1, TIMESTAMP.class));
|
||||
}
|
||||
}
|
||||
return value;
|
||||
};
|
||||
}
|
||||
|
||||
private static String unquote(String value) {
|
||||
return value.substring(1, value.length() - 1);
|
||||
}
|
||||
}
|
@ -28,7 +28,6 @@ public class ColumnDefinitionParserListener extends BaseParserListener {
|
||||
private final DataTypeResolver dataTypeResolver;
|
||||
private final TableEditor tableEditor;
|
||||
private final List<ParseTreeListener> listeners;
|
||||
|
||||
private ColumnEditor columnEditor;
|
||||
|
||||
ColumnDefinitionParserListener(final TableEditor tableEditor, final ColumnEditor columnEditor, OracleDdlParser parser,
|
||||
@ -51,6 +50,9 @@ Column getColumn() {
|
||||
@Override
|
||||
public void enterColumn_definition(PlSqlParser.Column_definitionContext ctx) {
|
||||
resolveColumnDataType(ctx);
|
||||
if (ctx.DEFAULT() != null) {
|
||||
columnEditor.defaultValueExpression(ctx.column_default_value().getText());
|
||||
}
|
||||
super.enterColumn_definition(ctx);
|
||||
}
|
||||
|
||||
@ -67,6 +69,9 @@ public void enterPrimary_key_clause(PlSqlParser.Primary_key_clauseContext ctx) {
|
||||
@Override
|
||||
public void enterModify_col_properties(PlSqlParser.Modify_col_propertiesContext ctx) {
|
||||
resolveColumnDataType(ctx);
|
||||
if (ctx.DEFAULT() != null) {
|
||||
columnEditor.defaultValueExpression(ctx.column_default_value().getText());
|
||||
}
|
||||
super.enterModify_col_properties(ctx);
|
||||
}
|
||||
|
||||
@ -74,6 +79,9 @@ public void enterModify_col_properties(PlSqlParser.Modify_col_propertiesContext
|
||||
private void resolveColumnDataType(PlSqlParser.Column_definitionContext ctx) {
|
||||
columnEditor.name(getColumnName(ctx.column_name()));
|
||||
|
||||
boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null);
|
||||
columnEditor.optional(!hasNotNullConstraint);
|
||||
|
||||
if (ctx.datatype() == null) {
|
||||
if (ctx.type_name() != null && "\"MDSYS\".\"SDO_GEOMETRY\"".equalsIgnoreCase(ctx.type_name().getText())) {
|
||||
columnEditor.jdbcType(Types.STRUCT).type("MDSYS.SDO_GEOMETRY");
|
||||
@ -81,17 +89,7 @@ private void resolveColumnDataType(PlSqlParser.Column_definitionContext ctx) {
|
||||
}
|
||||
else {
|
||||
resolveColumnDataType(ctx.datatype());
|
||||
|
||||
// todo move to enterExpression and apply type conversion
|
||||
if (ctx.DEFAULT() != null) {
|
||||
String defaultValue = ctx.expression().getText();
|
||||
columnEditor.defaultValueExpression(defaultValue);
|
||||
}
|
||||
}
|
||||
|
||||
boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null);
|
||||
// todo move to nonNull
|
||||
columnEditor.optional(!hasNotNullConstraint);
|
||||
}
|
||||
|
||||
private void resolveColumnDataType(PlSqlParser.Modify_col_propertiesContext ctx) {
|
||||
|
@ -0,0 +1,650 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.oracle;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.oracle.util.TestHelper;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.time.MicroDuration;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
/**
|
||||
* Integration test that tests all supported data types and default value combinations during
|
||||
* both snapshot and streaming phases in conjunction with schema changes to the table.
|
||||
*
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class OracleDefaultValueIT extends AbstractConnectorTest {
|
||||
|
||||
private OracleConnection connection;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
connection = TestHelper.testConnection();
|
||||
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
|
||||
TestHelper.dropTable(connection, "default_value_test");
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() throws Exception {
|
||||
if (connection != null && connection.isConnected()) {
|
||||
TestHelper.dropTable(connection, "default_value_test");
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-3710")
|
||||
public void shouldHandleNumericDefaultTypes() throws Exception {
|
||||
List<ColumnDefinition> columnDefinitions = Arrays.asList(
|
||||
new ColumnDefinition("val_int", "int",
|
||||
"1", "2",
|
||||
BigDecimal.valueOf(1L), BigDecimal.valueOf(2L),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_integer", "integer",
|
||||
"1", "2",
|
||||
BigDecimal.valueOf(1L), BigDecimal.valueOf(2L),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_smallint", "smallint",
|
||||
"1", "2",
|
||||
BigDecimal.valueOf(1L), BigDecimal.valueOf(2L),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_num", "number",
|
||||
"1", "2",
|
||||
BigDecimal.valueOf(1L), BigDecimal.valueOf(2L),
|
||||
AssertionType.FIELD_NO_DEFAULT),
|
||||
new ColumnDefinition("val_number_38_no_scale", "number(38)",
|
||||
"1", "2",
|
||||
BigDecimal.valueOf(1L), BigDecimal.valueOf(2L),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_38_scale_0", "number(38,0)",
|
||||
"1", "2",
|
||||
BigDecimal.valueOf(1L), BigDecimal.valueOf(2L),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_2", "number(2)",
|
||||
"1", "2",
|
||||
(byte) 1, (byte) 2,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_4", "number(4)",
|
||||
"1", "2",
|
||||
(short) 1, (short) 2,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_9", "number(9)",
|
||||
"1", "2",
|
||||
1, 2,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_18", "number(18)",
|
||||
"1", "2",
|
||||
1L, 2L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_2_neg_scale", "number(1,-1)",
|
||||
"10", "20",
|
||||
(byte) 10, (byte) 20,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_4_neg_scale", "number(2,-2)",
|
||||
"100", "200",
|
||||
(short) 100, (short) 200,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_9_neg_scale", "number(8,-1)",
|
||||
"10", "20",
|
||||
10, 20,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_918_neg_scale", "number(16,-2)",
|
||||
"100", "200",
|
||||
100L, 200L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_decimal", "decimal(10)",
|
||||
"125", "250",
|
||||
125L, 250L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_numeric", "numeric(10)",
|
||||
"125", "250",
|
||||
125L, 250L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_1", "number(1)",
|
||||
"1", "2",
|
||||
(byte) 1, (byte) 2,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL));
|
||||
|
||||
shouldHandleDefaultValuesCommon(columnDefinitions);
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-3710")
|
||||
public void shouldHandleFloatPointDefaultTypes() throws Exception {
|
||||
List<ColumnDefinition> columnDefinitions = Arrays.asList(
|
||||
new ColumnDefinition("val_bf", "binary_float",
|
||||
"3.14", "6.28",
|
||||
3.14f, 6.28f,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_bd", "binary_double",
|
||||
"3.14", "6.28",
|
||||
3.14d, 6.28d,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_float", "float",
|
||||
"3.14", "6.28",
|
||||
3.14f, 6.28f,
|
||||
AssertionType.FIELD_NO_DEFAULT),
|
||||
new ColumnDefinition("val_float_10", "float(10)",
|
||||
"3.14", "6.28",
|
||||
3.14f, 6.28f,
|
||||
AssertionType.FIELD_NO_DEFAULT),
|
||||
new ColumnDefinition("val_double_precision", "double precision",
|
||||
"3.14", "6.28",
|
||||
3.14f, 6.28f,
|
||||
AssertionType.FIELD_NO_DEFAULT),
|
||||
new ColumnDefinition("val_real", "real",
|
||||
"3.14", "6.28",
|
||||
3.14f, 6.28f,
|
||||
AssertionType.FIELD_NO_DEFAULT),
|
||||
new ColumnDefinition("val_number_10_6", "number(10,6)",
|
||||
"123.45", "234.57",
|
||||
BigDecimal.valueOf(123450000, 6), BigDecimal.valueOf(234570000, 6),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_decimal_10_6", "decimal(10,6)",
|
||||
"3.14", "6.28",
|
||||
BigDecimal.valueOf(3140000, 6), BigDecimal.valueOf(6280000, 6),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_numeric_10_6", "numeric(10,6)",
|
||||
"3.14", "6.28",
|
||||
BigDecimal.valueOf(3140000, 6), BigDecimal.valueOf(6280000, 6),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_number_vs", "number",
|
||||
"3.14", "6.28",
|
||||
3.14f, 6.28f,
|
||||
AssertionType.FIELD_NO_DEFAULT));
|
||||
|
||||
shouldHandleDefaultValuesCommon(columnDefinitions);
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-3710")
|
||||
public void shouldHandleCharacterDefaultTypes() throws Exception {
|
||||
List<ColumnDefinition> columnDefinitions = Arrays.asList(
|
||||
new ColumnDefinition("val_varchar", "varchar(100)",
|
||||
"'hello'", "'world'",
|
||||
"hello", "world",
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_varchar2", "varchar2(100)",
|
||||
"'red'", "'green'",
|
||||
"red", "green",
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_nvarchar2", "nvarchar2(100)",
|
||||
"'cedric'", "'entertainer'",
|
||||
"cedric", "entertainer",
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_char", "char(5)",
|
||||
"'YES'", "'NO'",
|
||||
"YES ", "NO ",
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_nchar", "nchar(5)",
|
||||
"'ON'", "'OFF'",
|
||||
"ON ", "OFF ",
|
||||
AssertionType.FIELD_DEFAULT_EQUAL));
|
||||
|
||||
shouldHandleDefaultValuesCommon(columnDefinitions);
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-3710")
|
||||
public void shouldHandleDateTimeDefaultTypes() throws Exception {
|
||||
List<ColumnDefinition> columnDefinitions = Arrays.asList(
|
||||
new ColumnDefinition("val_date", "date",
|
||||
"TO_DATE('2001-02-03 00:00:00', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
"TO_DATE('2005-01-31 00:00:00', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
981158400000L, 1107129600000L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_ts", "timestamp",
|
||||
"TO_DATE('2001-02-03 01:02:03', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
"TO_DATE('2005-01-31 02:03:04', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
981162123000000L, 1107136984000000L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_ts_prec2", "timestamp(2)",
|
||||
"TO_DATE('2001-02-03 01:02:03', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
"TO_DATE('2005-01-31 02:03:04', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
981162123000L, 1107136984000L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_ts_prec4", "timestamp(4)",
|
||||
"TO_DATE('2001-02-03 01:02:03', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
"TO_DATE('2005-01-31 02:03:04', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
981162123000000L, 1107136984000000L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_ts_prec9", "timestamp(9)",
|
||||
"TO_DATE('2001-02-03 01:02:03', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
"TO_DATE('2005-01-31 02:03:04', 'YYYY-MM-DD HH24:MI:SS')",
|
||||
981162123000000000L, 1107136984000000000L,
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_tstz", "timestamp with time zone",
|
||||
"TO_TIMESTAMP_TZ('2018-03-27 01:34:56.00789 -11:00', 'yyyy-mm-dd HH24:MI:SS.FF5 TZH:TZM')",
|
||||
"TO_TIMESTAMP_TZ('2019-04-28 02:35:57.00891 -10:00', 'yyyy-mm-dd HH24:MI:SS.FF5 TZH:TZM')",
|
||||
"2018-03-27T01:34:56.00789-11:00",
|
||||
"2019-04-28T02:35:57.00891-10:00",
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_tsltz", "timestamp with local time zone",
|
||||
"TO_TIMESTAMP_TZ('2018-03-27 01:34:56.00789 -11:00', 'yyyy-mm-dd HH24:MI:SS.FF5 TZH:TZM')",
|
||||
"TO_TIMESTAMP_TZ('2019-04-28 02:35:57.00891 -10:00', 'yyyy-mm-dd HH24:MI:SS.FF5 TZH:TZM')",
|
||||
"2018-03-27T12:34:56.00789Z", // 1am + 11 hours, stored in UTC and returned in UTC
|
||||
"2019-04-28T12:35:57.00891Z", // 2am + 10 hours, stored in UTC and returned in UTC
|
||||
AssertionType.FIELD_DEFAULT_EQUAL));
|
||||
|
||||
shouldHandleDefaultValuesCommon(columnDefinitions);
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-3710")
|
||||
public void shouldHandleIntervalDefaultTypes() throws Exception {
|
||||
List<ColumnDefinition> columnDefinitions = Arrays.asList(
|
||||
new ColumnDefinition("val_int_ytm", "interval year to month",
|
||||
"'5-3'", "'7-4'",
|
||||
getOracleIntervalYearMonth(5, 3), getOracleIntervalYearMonth(7, 4),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||
new ColumnDefinition("val_int_dts", "interval day(3) to second(3)",
|
||||
"'5 1:2:3.456'", "'3 2:1:4.567'",
|
||||
getOracleIntervalDaySecond(5, 1, 2, 3, 456000), getOracleIntervalDaySecond(3, 2, 1, 4, 567000),
|
||||
AssertionType.FIELD_DEFAULT_EQUAL));
|
||||
|
||||
shouldHandleDefaultValuesCommon(columnDefinitions);
|
||||
}
|
||||
|
||||
private long getOracleIntervalYearMonth(int years, int month) {
|
||||
return MicroDuration.durationMicros(years, month, 0, 0, 0, 0, MicroDuration.DAYS_PER_MONTH_AVG);
|
||||
}
|
||||
|
||||
private long getOracleIntervalDaySecond(int days, int hours, int minutes, int seconds, int micros) {
|
||||
return MicroDuration.durationMicros(0, 0, days, hours, minutes, seconds, micros, MicroDuration.DAYS_PER_MONTH_AVG);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles executing the full common set of default value tests for the supplied column definitions.
|
||||
*
|
||||
* @param columnDefinitions list of column definitions, should not be {@code null}
|
||||
* @throws Exception if an exception occurred
|
||||
*/
|
||||
private void shouldHandleDefaultValuesCommon(List<ColumnDefinition> columnDefinitions) throws Exception {
|
||||
testDefaultValuesCreateTableAndSnapshot(columnDefinitions);
|
||||
testDefaultValuesAlterTableModifyExisting(columnDefinitions);
|
||||
testDefaultValuesAlterTableAdd(columnDefinitions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the table and pre-inserts a record captured during the snapshot phase. The snapshot
|
||||
* record will be validated against the supplied column definitions.
|
||||
*
|
||||
* The goal of this method is to test that when a table is snapshot which uses default values
|
||||
* that both the in-memory schema representation and the snapshot pipeline change event have
|
||||
* the right default value resolution.
|
||||
*
|
||||
* @param columnDefinitions list of column definitions, should not be {@code null}
|
||||
* @throws Exception if an exception occurred
|
||||
*/
|
||||
private void testDefaultValuesCreateTableAndSnapshot(List<ColumnDefinition> columnDefinitions) throws Exception {
|
||||
// Build SQL
|
||||
final StringBuilder createSql = new StringBuilder();
|
||||
createSql.append("CREATE TABLE default_value_test (id numeric(9,0) not null");
|
||||
for (ColumnDefinition column : columnDefinitions) {
|
||||
createSql.append(", ")
|
||||
.append(column.name)
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default ").append(column.addDefaultValue);
|
||||
createSql.append(", ")
|
||||
.append(column.name).append("_null")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default null");
|
||||
if (column.temporalType) {
|
||||
createSql.append(", ")
|
||||
.append(column.name).append("_sysdate")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default sysdate");
|
||||
createSql.append(", ")
|
||||
.append(column.name).append("_sysdate_nonnull")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default sysdate not null");
|
||||
}
|
||||
}
|
||||
createSql.append(", primary key(id))");
|
||||
|
||||
// Create table and add logging support
|
||||
connection.execute(createSql.toString());
|
||||
TestHelper.streamTable(connection, "default_value_test");
|
||||
|
||||
// Insert snapshot record
|
||||
connection.execute("INSERT INTO default_value_test (id) values (1)");
|
||||
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DEFAULT_VALUE_TEST")
|
||||
.build();
|
||||
|
||||
// Start connector
|
||||
start(OracleConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
// Wait and capture snapshot records
|
||||
waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
|
||||
// Verify we got only 1 record for our test
|
||||
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DEFAULT_VALUE_TEST");
|
||||
assertThat(tableRecords).hasSize(1);
|
||||
|
||||
SourceRecord record = tableRecords.get(0);
|
||||
for (ColumnDefinition column : columnDefinitions) {
|
||||
switch (column.assertionType) {
|
||||
case FIELD_DEFAULT_EQUAL:
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, column.name.toUpperCase(), column.expectedAddDefaultValue);
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, column.name.toUpperCase() + "_NULL", null);
|
||||
break;
|
||||
case FIELD_NO_DEFAULT:
|
||||
assertSchemaFieldNoDefaultWithValue(record, column.name.toUpperCase(), column.expectedAddDefaultValue);
|
||||
assertSchemaFieldNoDefaultWithValue(record, column.name.toUpperCase() + "_NULL", null);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unexpected assertion type: " + column.assertionType);
|
||||
}
|
||||
|
||||
if (column.temporalType) {
|
||||
assertSchemaFieldWithDefaultSysdate(record, column.name.toUpperCase() + "_SYSDATE", null);
|
||||
if (column.expectedAddDefaultValue instanceof String) {
|
||||
assertSchemaFieldDefaultAndNonNullValue(record, column.name.toUpperCase() + "_SYSDATE_NONNULL", "0");
|
||||
}
|
||||
else {
|
||||
assertSchemaFieldWithDefaultSysdate(record, column.name.toUpperCase() + "_SYSDATE_NONNULL", 0L);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Alters the underlying table changing the default value to its second form. This method then inserts
|
||||
* a new record that is then validated against the supplied column definitions.
|
||||
*
|
||||
* The goal of this method is to test that when DDL modifies an existing column in an existing table
|
||||
* that the right default value resolution occurs and that the in-memory schema representation is
|
||||
* correct as well as the change event capture pipeline.
|
||||
*
|
||||
* @param columnDefinitions list of column definitions, should not be {@code null}
|
||||
* @throws Exception if an exception occurred
|
||||
*/
|
||||
private void testDefaultValuesAlterTableModifyExisting(List<ColumnDefinition> columnDefinitions) throws Exception {
|
||||
// Build SQL
|
||||
final StringBuilder alterSql = new StringBuilder();
|
||||
alterSql.append("ALTER TABLE default_value_test modify (");
|
||||
Iterator<ColumnDefinition> iterator = columnDefinitions.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
final ColumnDefinition column = iterator.next();
|
||||
alterSql.append(column.name)
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default ").append(column.modifyDefaultValue);
|
||||
alterSql.append(", ")
|
||||
.append(column.name).append("_null")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default null");
|
||||
if (column.temporalType) {
|
||||
alterSql.append(", ")
|
||||
.append(column.name).append("_sysdate")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default sysdate");
|
||||
// cannot add alter for column to not null since it is already not null
|
||||
// see creation of table method for where we define this field as not null
|
||||
}
|
||||
if (iterator.hasNext()) {
|
||||
alterSql.append(", ");
|
||||
}
|
||||
}
|
||||
alterSql.append(")");
|
||||
|
||||
// Wait until we're in streaming phase if we're not already
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
|
||||
connection.execute(alterSql.toString());
|
||||
connection.execute("INSERT INTO default_value_test (id) values (2)");
|
||||
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
|
||||
// Verify we got only 1 record for our test
|
||||
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DEFAULT_VALUE_TEST");
|
||||
assertThat(tableRecords).hasSize(1);
|
||||
|
||||
SourceRecord record = tableRecords.get(0);
|
||||
for (ColumnDefinition column : columnDefinitions) {
|
||||
switch (column.assertionType) {
|
||||
case FIELD_DEFAULT_EQUAL:
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, column.name.toUpperCase(), column.expectedModifyDefaultValue);
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, column.name.toUpperCase() + "_NULL", null);
|
||||
break;
|
||||
case FIELD_NO_DEFAULT:
|
||||
assertSchemaFieldNoDefaultWithValue(record, column.name.toUpperCase(), column.expectedModifyDefaultValue);
|
||||
assertSchemaFieldNoDefaultWithValue(record, column.name.toUpperCase() + "_NULL", null);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unexpected assertion type: " + column.assertionType);
|
||||
}
|
||||
|
||||
if (column.temporalType) {
|
||||
assertSchemaFieldWithDefaultSysdate(record, column.name.toUpperCase() + "_SYSDATE", null);
|
||||
if (column.expectedAddDefaultValue instanceof String) {
|
||||
assertSchemaFieldDefaultAndNonNullValue(record, column.name.toUpperCase() + "_SYSDATE_NONNULL", "0");
|
||||
}
|
||||
else {
|
||||
assertSchemaFieldWithDefaultSysdate(record, column.name.toUpperCase() + "_SYSDATE_NONNULL", 0L);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Alters the underlying table changing adding a new column prefixed with {@code A} to each of the column
|
||||
* definition with the initial default value definition.
|
||||
*
|
||||
* The goal of this method is to test that when DDL adds a new column to an existing table that the right
|
||||
* default value resolution occurs and that the in-memory schema representation is correct as well as the
|
||||
* change event capture pipeline.
|
||||
*
|
||||
* @param columnDefinitions list of column definitions, should not be {@code null}
|
||||
* @throws Exception if an exception occurred
|
||||
*/
|
||||
private void testDefaultValuesAlterTableAdd(List<ColumnDefinition> columnDefinitions) throws Exception {
|
||||
// Build SQL
|
||||
final StringBuilder alterSql = new StringBuilder();
|
||||
alterSql.append("ALTER TABLE default_value_test add (");
|
||||
Iterator<ColumnDefinition> iterator = columnDefinitions.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
final ColumnDefinition column = iterator.next();
|
||||
alterSql.append("a").append(column.name)
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default ").append(column.addDefaultValue);
|
||||
alterSql.append(", ")
|
||||
.append("a").append(column.name).append("_null")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default null");
|
||||
if (column.temporalType) {
|
||||
alterSql.append(", ")
|
||||
.append("a").append(column.name).append("_sysdate")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default sysdate");
|
||||
alterSql.append(", ")
|
||||
.append("a").append(column.name).append("_sysdate_nonnull")
|
||||
.append(" ").append(column.definition)
|
||||
.append(" ").append("default sysdate not null");
|
||||
}
|
||||
if (iterator.hasNext()) {
|
||||
alterSql.append(", ");
|
||||
}
|
||||
}
|
||||
alterSql.append(")");
|
||||
|
||||
// Wait until we're in streaming phase if we're not already
|
||||
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||
|
||||
connection.execute(alterSql.toString());
|
||||
connection.execute("INSERT INTO default_value_test (id) values (3)");
|
||||
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
|
||||
// Verify we got only 1 record for our test
|
||||
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.DEFAULT_VALUE_TEST");
|
||||
assertThat(tableRecords).hasSize(1);
|
||||
|
||||
SourceRecord record = tableRecords.get(0);
|
||||
for (ColumnDefinition column : columnDefinitions) {
|
||||
switch (column.assertionType) {
|
||||
case FIELD_DEFAULT_EQUAL:
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, column.name.toUpperCase(), column.expectedModifyDefaultValue);
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, column.name.toUpperCase() + "_NULL", null);
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, "A" + column.name.toUpperCase(), column.expectedAddDefaultValue);
|
||||
assertSchemaFieldWithSameDefaultAndValue(record, "A" + column.name.toUpperCase() + "_NULL", null);
|
||||
break;
|
||||
case FIELD_NO_DEFAULT:
|
||||
assertSchemaFieldNoDefaultWithValue(record, column.name.toUpperCase(), column.expectedModifyDefaultValue);
|
||||
assertSchemaFieldNoDefaultWithValue(record, column.name.toUpperCase() + "_NULL", null);
|
||||
assertSchemaFieldNoDefaultWithValue(record, "A" + column.name.toUpperCase(), column.expectedAddDefaultValue);
|
||||
assertSchemaFieldNoDefaultWithValue(record, "A" + column.name.toUpperCase() + "_NULL", null);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unexpected assertion type: " + column.assertionType);
|
||||
}
|
||||
|
||||
if (column.temporalType) {
|
||||
assertSchemaFieldWithDefaultSysdate(record, column.name.toUpperCase() + "_SYSDATE", null);
|
||||
assertSchemaFieldWithDefaultSysdate(record, "A" + column.name.toUpperCase() + "_SYSDATE", null);
|
||||
if (column.expectedAddDefaultValue instanceof String) {
|
||||
assertSchemaFieldDefaultAndNonNullValue(record, column.name.toUpperCase() + "_SYSDATE_NONNULL", "0");
|
||||
assertSchemaFieldDefaultAndNonNullValue(record, "A" + column.name.toUpperCase() + "_SYSDATE_NONNULL", "0");
|
||||
}
|
||||
else {
|
||||
assertSchemaFieldWithDefaultSysdate(record, column.name.toUpperCase() + "_SYSDATE_NONNULL", 0L);
|
||||
assertSchemaFieldWithDefaultSysdate(record, "A" + column.name.toUpperCase() + "_SYSDATE_NONNULL", 0L);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the schema field's default value and after emitted event value are the same.
|
||||
*
|
||||
* @param record the change event record, never {@code null}
|
||||
* @param fieldName the field name, never {@code null}
|
||||
* @param expectedValue the expected value in the field's default and "after" struct
|
||||
*/
|
||||
private static void assertSchemaFieldWithSameDefaultAndValue(SourceRecord record, String fieldName, Object expectedValue) {
|
||||
assertSchemaFieldValueWithDefault(record, fieldName, expectedValue, r -> {
|
||||
assertThat(r).as("Unexpected field value: " + fieldName).isEqualTo(expectedValue);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the schema field's default value is not set and that the emitted event value matches.
|
||||
*
|
||||
* @param record the change event record, never {@code null}
|
||||
* @param fieldName the field name, never {@code null}
|
||||
* @param fieldValue the expected value in the field's "after" struct
|
||||
*/
|
||||
// asserts that the field schema has no default value and an emitted value
|
||||
private static void assertSchemaFieldNoDefaultWithValue(SourceRecord record, String fieldName, Object fieldValue) {
|
||||
assertSchemaFieldValueWithDefault(record, fieldName, null, r -> {
|
||||
assertThat(r).as("Unexpected field value: " + fieldName).isEqualTo(fieldValue);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the schema field's default value is the supplied value and that the emitted events field
|
||||
* value is at least a non-null value; expectation is that the emitted event value is dynamic, likely
|
||||
* based on some database function call, like {@code TO_DATE} or {@code TO_TIMESTAMP}.
|
||||
*
|
||||
* @param record the change event record, never {@code null}
|
||||
* @param fieldName the field name, never {@code null}
|
||||
* @param defaultValue the expected schema field's default value
|
||||
*/
|
||||
// asserts that the field schema has a given default value and a non-null emitted event value
|
||||
private static void assertSchemaFieldDefaultAndNonNullValue(SourceRecord record, String fieldName, Object defaultValue) {
|
||||
assertSchemaFieldValueWithDefault(record, fieldName, defaultValue, r -> {
|
||||
assertThat(r).as("Unexpected field value: " + fieldName).isNotNull();
|
||||
});
|
||||
}
|
||||
|
||||
private static void assertSchemaFieldWithDefaultSysdate(SourceRecord record, String fieldName, Object expectedValue) {
|
||||
assertSchemaFieldValueWithDefault(record, fieldName, expectedValue, r -> {
|
||||
if (expectedValue == null) {
|
||||
assertThat(r).isNull();
|
||||
}
|
||||
else {
|
||||
assertThat((long) r).as("Unexpected field value: " + fieldName).isGreaterThanOrEqualTo(1L);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void assertSchemaFieldValueWithDefault(SourceRecord record, String fieldName, Object expectedDefault, Consumer<Object> valueCheck) {
|
||||
final Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
|
||||
final Field field = after.schema().field(fieldName);
|
||||
assertThat(field).as("Expected non-null field for " + fieldName).isNotNull();
|
||||
final Object defaultValue = field.schema().defaultValue();
|
||||
if (expectedDefault == null) {
|
||||
assertThat(defaultValue).isNull();
|
||||
return;
|
||||
}
|
||||
else {
|
||||
assertThat(defaultValue).as("Expected non-null default value for field " + fieldName).isNotNull();
|
||||
}
|
||||
assertThat(defaultValue.getClass()).isEqualTo(expectedDefault.getClass());
|
||||
assertThat(defaultValue).as("Unexpected default value: " + fieldName + " with field value: " + after.get(fieldName)).isEqualTo(expectedDefault);
|
||||
valueCheck.accept(after.get(fieldName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the different assertion types for a given column definition.
|
||||
*/
|
||||
enum AssertionType {
|
||||
// field and default values are identical
|
||||
FIELD_DEFAULT_EQUAL,
|
||||
// schema has no default value specified
|
||||
FIELD_NO_DEFAULT,
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a column definition and its attributes that are used by tests.
|
||||
*/
|
||||
private static class ColumnDefinition {
|
||||
public final String name;
|
||||
public final String definition;
|
||||
public final String addDefaultValue;
|
||||
public final String modifyDefaultValue;
|
||||
public final Object expectedAddDefaultValue;
|
||||
public final Object expectedModifyDefaultValue;
|
||||
public final AssertionType assertionType;
|
||||
public final boolean temporalType;
|
||||
|
||||
public ColumnDefinition(String name, String definition, String addDefaultValue, String modifyDefaultValue,
|
||||
Object expectedAddDefaultValue, Object expectedModifyDefaultValue, AssertionType assertionType) {
|
||||
this.name = name;
|
||||
this.definition = definition;
|
||||
this.addDefaultValue = addDefaultValue;
|
||||
this.modifyDefaultValue = modifyDefaultValue;
|
||||
this.expectedAddDefaultValue = expectedAddDefaultValue;
|
||||
this.expectedModifyDefaultValue = expectedModifyDefaultValue;
|
||||
this.assertionType = assertionType;
|
||||
this.temporalType = definition.equalsIgnoreCase("date") || definition.toUpperCase().startsWith("TIMESTAMP");
|
||||
}
|
||||
}
|
||||
}
|
@ -25,6 +25,7 @@
|
||||
import io.debezium.connector.oracle.OracleConnection;
|
||||
import io.debezium.connector.oracle.OracleConnectorConfig;
|
||||
import io.debezium.connector.oracle.OracleDatabaseSchema;
|
||||
import io.debezium.connector.oracle.OracleDefaultValueConverter;
|
||||
import io.debezium.connector.oracle.OracleTopicSelector;
|
||||
import io.debezium.connector.oracle.OracleValueConverters;
|
||||
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
|
||||
@ -269,11 +270,12 @@ private String resolveLogMineryContentQueryFromTemplate(OracleConnectorConfig co
|
||||
private OracleDatabaseSchema createSchema(OracleConnectorConfig connectorConfig) {
|
||||
OracleConnection connection = Mockito.mock(OracleConnection.class);
|
||||
OracleValueConverters converters = new OracleValueConverters(connectorConfig, connection);
|
||||
OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(converters, connection);
|
||||
TableNameCaseSensitivity tableNameSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(connection);
|
||||
|
||||
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
|
||||
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
|
||||
|
||||
return new OracleDatabaseSchema(connectorConfig, converters, schemaNameAdjuster, topicSelector, tableNameSensitivity);
|
||||
return new OracleDatabaseSchema(connectorConfig, converters, defaultValueConverter, schemaNameAdjuster, topicSelector, tableNameSensitivity);
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
import io.debezium.connector.oracle.OracleConnection;
|
||||
import io.debezium.connector.oracle.OracleConnectorConfig;
|
||||
import io.debezium.connector.oracle.OracleDatabaseSchema;
|
||||
import io.debezium.connector.oracle.OracleDefaultValueConverter;
|
||||
import io.debezium.connector.oracle.OracleOffsetContext;
|
||||
import io.debezium.connector.oracle.OraclePartition;
|
||||
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
|
||||
@ -255,10 +256,12 @@ private OracleDatabaseSchema createOracleDatabaseSchema() throws Exception {
|
||||
final TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
|
||||
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
|
||||
final OracleValueConverters converters = new OracleValueConverters(connectorConfig, connection);
|
||||
final OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(converters, connection);
|
||||
final TableNameCaseSensitivity sensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(connection);
|
||||
|
||||
final OracleDatabaseSchema schema = new OracleDatabaseSchema(connectorConfig,
|
||||
converters,
|
||||
defaultValueConverter,
|
||||
schemaNameAdjuster,
|
||||
topicSelector,
|
||||
sensitivity);
|
||||
|
@ -1261,7 +1261,7 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
|
||||
|
||||
final String columnName = columnMetadata.getString(4);
|
||||
if (columnFilter == null || columnFilter.matches(tableId.catalog(), tableId.schema(), tableId.table(), columnName)) {
|
||||
final ColumnEditor column = Column.editor().name(columnName);
|
||||
ColumnEditor column = Column.editor().name(columnName);
|
||||
column.type(columnMetadata.getString(6));
|
||||
column.length(columnMetadata.getInt(7));
|
||||
if (columnMetadata.getObject(9) != null) {
|
||||
@ -1281,6 +1281,10 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
|
||||
|
||||
column.nativeType(resolveNativeType(column.typeName()));
|
||||
column.jdbcType(resolveJdbcType(columnMetadata.getInt(5), column.nativeType()));
|
||||
|
||||
// Allow implementation to make column changes if required before being added to table
|
||||
column = overrideColumn(column);
|
||||
|
||||
if (defaultValue != null) {
|
||||
column.defaultValueExpression(defaultValue);
|
||||
}
|
||||
@ -1290,6 +1294,11 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
protected ColumnEditor overrideColumn(ColumnEditor column) {
|
||||
// allows the implementation to override column-specifics; the default does no overrides
|
||||
return column;
|
||||
}
|
||||
|
||||
public List<String> readPrimaryKeyNames(DatabaseMetaData metadata, TableId id) throws SQLException {
|
||||
final List<String> pkColumnNames = new ArrayList<>();
|
||||
try (ResultSet rs = metadata.getPrimaryKeys(id.catalog(), id.schema(), id.table())) {
|
||||
|
@ -2786,7 +2786,7 @@ modify_column_clauses
|
||||
;
|
||||
|
||||
modify_col_properties
|
||||
: column_name datatype? (DEFAULT expression)? (ENCRYPT encryption_spec | DECRYPT)? inline_constraint* lob_storage_clause? //TODO alter_xmlschema_clause
|
||||
: column_name datatype? (DEFAULT column_default_value)? (ENCRYPT encryption_spec | DECRYPT)? inline_constraint* lob_storage_clause? //TODO alter_xmlschema_clause
|
||||
;
|
||||
|
||||
modify_col_substitutable
|
||||
@ -2922,9 +2922,13 @@ end_time_column
|
||||
|
||||
column_definition
|
||||
: column_name (datatype | type_name)
|
||||
SORT? (DEFAULT expression)? (ENCRYPT (USING CHAR_STRING)? (IDENTIFIED BY regular_id)? CHAR_STRING? (NO? SALT)? )? (inline_constraint* | inline_ref_constraint)
|
||||
SORT? (DEFAULT column_default_value)? (ENCRYPT (USING CHAR_STRING)? (IDENTIFIED BY regular_id)? CHAR_STRING? (NO? SALT)? )? (inline_constraint* | inline_ref_constraint)
|
||||
;
|
||||
|
||||
column_default_value
|
||||
: constant
|
||||
| expression;
|
||||
|
||||
virtual_column_definition
|
||||
: column_name datatype? autogenerated_sequence_definition?
|
||||
VIRTUAL? inline_constraint*
|
||||
|
Loading…
Reference in New Issue
Block a user