DBZ-1392 Reformat Oracle plugin

This commit is contained in:
Jiri Pechanec 2019-11-08 13:11:28 +01:00 committed by Gunnar Morling
parent d49f666b05
commit 4f9e9a94ca
28 changed files with 351 additions and 341 deletions

View File

@ -13,6 +13,7 @@
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import oracle.streams.ChunkColumnValue;
import oracle.streams.DDLLCR;
import oracle.streams.LCR;
@ -36,7 +37,8 @@ class LcrEventHandler implements XStreamLCRCallbackHandler {
private final OracleOffsetContext offsetContext;
private final boolean tablenameCaseInsensitive;
public LcrEventHandler(ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock, RelationalDatabaseSchema schema, OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive) {
public LcrEventHandler(ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock, RelationalDatabaseSchema schema,
OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive) {
this.errorHandler = errorHandler;
this.dispatcher = dispatcher;
this.clock = clock;
@ -58,8 +60,7 @@ public void processLCR(LCR lcr) throws StreamsException {
lcrPosition,
lcrPosition.getScn(),
recPosition != null ? recPosition : "none",
recPosition != null ? recPosition.getScn() : "none"
);
recPosition != null ? recPosition.getScn() : "none");
}
return;
}
@ -100,8 +101,7 @@ private void dispatchDataChangeEvent(RowLCR lcr) throws InterruptedException {
dispatcher.dispatchDataChangeEvent(
tableId,
new XStreamChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock)
);
new XStreamChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock));
}
private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedException {
@ -113,8 +113,7 @@ private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedExceptio
dispatcher.dispatchSchemaChangeEvent(
tableId,
new OracleSchemaChangeEventEmitter(offsetContext, tableId, ddlLcr)
);
new OracleSchemaChangeEventEmitter(offsetContext, tableId, ddlLcr));
}
private TableId getTableId(LCR lcr) {
@ -140,4 +139,4 @@ public LCR createLCR() throws StreamsException {
public ChunkColumnValue createChunk() throws StreamsException {
throw new UnsupportedOperationException("Should never be called");
}
}
}

View File

@ -13,6 +13,7 @@
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import oracle.streams.StreamsException;
import oracle.streams.XStreamUtility;

View File

@ -25,7 +25,7 @@ public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory
private final OracleDatabaseSchema schema;
public OracleChangeEventSourceFactory(OracleConnectorConfig configuration, OracleConnection jdbcConnection,
ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock, OracleDatabaseSchema schema) {
ErrorHandler errorHandler, EventDispatcher<TableId> dispatcher, Clock clock, OracleDatabaseSchema schema) {
this.configuration = configuration;
this.jdbcConnection = jdbcConnection;
this.errorHandler = errorHandler;
@ -49,7 +49,6 @@ public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext of
dispatcher,
errorHandler,
clock,
schema
);
schema);
}
}

View File

@ -24,6 +24,7 @@
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
import oracle.jdbc.OracleTypes;
public class OracleConnection extends JdbcConnection {
@ -85,7 +86,8 @@ public void resetSessionToCdb() {
@Override
public Set<TableId> readTableNames(String databaseCatalog, String schemaNamePattern, String tableNamePattern,
String[] tableTypes) throws SQLException {
String[] tableTypes)
throws SQLException {
Set<TableId> tableIds = super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes);
@ -96,7 +98,8 @@ public Set<TableId> readTableNames(String databaseCatalog, String schemaNamePatt
@Override
public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern, TableFilter tableFilter,
ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) throws SQLException {
ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc)
throws SQLException {
super.readSchema(tables, null, schemaNamePattern, null, columnFilter, removeTablesNotFoundInJdbc);
@ -117,23 +120,21 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
if (column.jdbcType() == Types.TIMESTAMP) {
editor.addColumn(
column.edit()
.length(column.scale().orElse(Column.UNSET_INT_VALUE))
.scale(null)
.create()
);
.length(column.scale().orElse(Column.UNSET_INT_VALUE))
.scale(null)
.create());
}
// NUMBER columns without scale value have it set to -127 instead of null;
// let's rectify that
else if (column.jdbcType() == OracleTypes.NUMBER) {
column.scale()
.filter(s -> s == ORACLE_UNSET_SCALE)
.ifPresent(s -> {
editor.addColumn(
column.edit()
.scale(null)
.create()
);
});
.filter(s -> s == ORACLE_UNSET_SCALE)
.ifPresent(s -> {
editor.addColumn(
column.edit()
.scale(null)
.create());
});
}
}
tables.overwriteTable(editor.create());

View File

@ -23,7 +23,6 @@ public Connection connect(JdbcConfiguration config) throws SQLException {
String password = config.getPassword();
return DriverManager.getConnection(
"jdbc:oracle:oci:@" + hostName + ":" + port + "/" + database, user, password
);
"jdbc:oracle:oci:@" + hostName + ":" + port + "/" + database, user, password);
}
}

View File

@ -25,6 +25,7 @@
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
import oracle.streams.XStreamUtility;
/**
@ -73,17 +74,17 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
+ "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. ");
public static final Field TABLENAME_CASE_INSENSITIVE = Field.create("database.tablename.case.insensitive")
.withDisplayName("Case insensitive table names")
.withType(Type.BOOLEAN)
.withDefault(false)
.withImportance(Importance.LOW)
.withDescription("Case insensitive table names; set to 'true' for Oracle 11g, 'false' (default) otherwise.");
.withDisplayName("Case insensitive table names")
.withType(Type.BOOLEAN)
.withDefault(false)
.withImportance(Importance.LOW)
.withDescription("Case insensitive table names; set to 'true' for Oracle 11g, 'false' (default) otherwise.");
public static final Field ORACLE_VERSION = Field.create("database.oracle.version")
.withDisplayName("Oracle version, 11 or 12+")
.withEnum(OracleVersion.class, OracleVersion.V12Plus)
.withImportance(Importance.LOW)
.withDescription("For default Oracle 12+, use default pos_version value v2, for Oracle 11, use pos_version value v1.");
.withDisplayName("Oracle version, 11 or 12+")
.withEnum(OracleVersion.class, OracleVersion.V12Plus)
.withImportance(Importance.LOW)
.withDescription("For default Oracle 12+, use default pos_version value v2, for Oracle 11, use pos_version value v1.");
public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME
.withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
@ -109,8 +110,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX,
TABLENAME_CASE_INSENSITIVE,
ORACLE_VERSION
);
ORACLE_VERSION);
private final String databaseName;
private final String pdbName;
@ -143,8 +143,7 @@ public static ConfigDef configDef() {
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX
);
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX);
Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.SNAPSHOT_DELAY_MS);
@ -167,7 +166,7 @@ public SnapshotMode getSnapshotMode() {
return snapshotMode;
}
public boolean getTablenameCaseInsensitive() {
public boolean getTablenameCaseInsensitive() {
return tablenameCaseInsensitive;
}
@ -208,7 +207,7 @@ public String getValue() {
}
public int getPosVersion() {
switch(version) {
switch (version) {
case "11":
return XStreamUtility.POS_VERSION_V1;
case "12+":

View File

@ -36,7 +36,8 @@ public class OracleConnectorTask extends BaseSourceTask {
private static final String CONTEXT_NAME = "oracle-connector-task";
private static enum State {
RUNNING, STOPPED;
RUNNING,
STOPPED;
}
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
@ -99,8 +100,7 @@ public void start(Configuration config) {
connectorConfig.getLogicalName(),
new OracleChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema),
dispatcher,
schema
);
schema);
coordinator.start(taskContext, this.queue, new OracleEventMetadataProvider());
}
@ -110,8 +110,8 @@ public List<SourceRecord> poll() throws InterruptedException {
List<DataChangeEvent> records = queue.poll();
List<SourceRecord> sourceRecords = records.stream()
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
if (!sourceRecords.isEmpty()) {
this.lastOffset = sourceRecords.get(sourceRecords.size() - 1).sourceOffset();

View File

@ -29,16 +29,16 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class);
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, OracleConnection connection) {
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector,
OracleConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
new TableSchemaBuilder(
new OracleValueConverters(connection),
schemaNameAdjuster,
connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getSanitizeFieldNames()),
connectorConfig.getTablenameCaseInsensitive(),
connectorConfig.getKeyMapper()
);
new TableSchemaBuilder(
new OracleValueConverters(connection),
schemaNameAdjuster,
connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getSanitizeFieldNames()),
connectorConfig.getTablenameCaseInsensitive(),
connectorConfig.getKeyMapper());
}
@Override

View File

@ -42,8 +42,7 @@ public Map<String, String> getEventSourcePosition(DataCollectionId source, Offse
}
final Long scn = sourceInfo.getInt64(SourceInfo.SCN_KEY);
return Collect.hashMapOf(
SourceInfo.SCN_KEY, scn == null ? "null" : Long.toString(scn)
);
SourceInfo.SCN_KEY, scn == null ? "null" : Long.toString(scn));
}
@Override

View File

@ -7,16 +7,17 @@
import java.util.Set;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import oracle.streams.DDLLCR;
/**
@ -59,11 +60,12 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException
Table table = tables.forTable(tableId);
receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), ddlLcr.getDDLText(), table, eventType, false));
receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(),
ddlLcr.getDDLText(), table, eventType, false));
}
private SchemaChangeEventType getSchemaChangeEventType() {
switch(ddlLcr.getCommandType()) {
switch (ddlLcr.getCommandType()) {
case "CREATE TABLE":
return SchemaChangeEventType.CREATE;
case "ALTER TABLE":

View File

@ -40,7 +40,9 @@ public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEve
private final OracleConnectorConfig connectorConfig;
private final OracleConnection jdbcConnection;
public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, OracleDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) {
public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection,
OracleDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock,
SnapshotProgressListener snapshotProgressListener) {
super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener);
this.connectorConfig = connectorConfig;
@ -71,13 +73,12 @@ protected SnapshotContext prepare(ChangeEventSourceContext context) throws Excep
}
return new OracleSnapshotContext(
connectorConfig.getPdbName() != null ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName()
);
connectorConfig.getPdbName() != null ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName());
}
@Override
protected Set<TableId> getAllTableIds(SnapshotContext ctx) throws Exception {
return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[] {"TABLE"} );
return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[]{ "TABLE" });
}
@Override
@ -113,8 +114,7 @@ protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception {
// we'd get a ORA-01466 when running the flashback query for doing the snapshot
do {
currentScn = getCurrentScn(ctx);
}
while (areSameTimestamp(latestTableDdlScn.orElse(null), currentScn));
} while (areSameTimestamp(latestTableDdlScn.orElse(null), currentScn));
ctx.offset = OracleOffsetContext.create()
.logicalName(connectorConfig)
@ -123,7 +123,7 @@ protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception {
}
private long getCurrentScn(SnapshotContext ctx) throws SQLException {
try(Statement statement = jdbcConnection.connection().createStatement();
try (Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE")) {
if (!rs.next()) {
@ -142,8 +142,8 @@ private boolean areSameTimestamp(Long scn1, long scn2) throws SQLException {
return false;
}
try(Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ") = SCN_TO_TIMESTAMP(" + scn2 + ")" )) {
try (Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ") = SCN_TO_TIMESTAMP(" + scn2 + ")")) {
return rs.next();
}
@ -181,8 +181,8 @@ private Optional<Long> getLatestTableDdlScn(SnapshotContext ctx) throws SQLExcep
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
Set<String> schemas = snapshotContext.capturedTables.stream()
.map(TableId::schema)
.collect(Collectors.toSet());
.map(TableId::schema)
.collect(Collectors.toSet());
// reading info only for the schemas we're interested in as per the set of captured tables;
// while the passed table name filter alone would skip all non-included tables, reading the schema
@ -198,15 +198,14 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Snapsh
schema,
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false
);
false);
}
}
@Override
protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext, Table table) throws SQLException {
try (Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("select dbms_metadata.get_ddl( 'TABLE', '" + table.id().table() + "', '" + table.id().schema() + "' ) from dual")) {
ResultSet rs = statement.executeQuery("select dbms_metadata.get_ddl( 'TABLE', '" + table.id().table() + "', '" + table.id().schema() + "' ) from dual")) {
if (!rs.next()) {
throw new IllegalStateException("Couldn't get metadata");

View File

@ -16,6 +16,7 @@
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import oracle.jdbc.OracleConnection;
import oracle.sql.NUMBER;
import oracle.streams.StreamsException;
@ -43,7 +44,8 @@ public class OracleStreamingChangeEventSource implements StreamingChangeEventSou
private final boolean tablenameCaseInsensitive;
private final int posVersion;
public OracleStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext, JdbcConnection jdbcConnection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema) {
public OracleStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext, JdbcConnection jdbcConnection,
EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema) {
this.jdbcConnection = jdbcConnection;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
@ -59,7 +61,8 @@ public OracleStreamingChangeEventSource(OracleConnectorConfig connectorConfig, O
public void execute(ChangeEventSourceContext context) throws InterruptedException {
try {
// 1. connect
final byte[] startPosition = offsetContext.getLcrPosition() != null ? offsetContext.getLcrPosition().getRawPosition() : convertScnToPosition(offsetContext.getScn());
final byte[] startPosition = offsetContext.getLcrPosition() != null ? offsetContext.getLcrPosition().getRawPosition()
: convertScnToPosition(offsetContext.getScn());
xsOut = XStreamOut.attach((OracleConnection) jdbcConnection.connection(), xStreamServerName,
startPosition, 1, 1, XStreamOut.DEFAULT_MODE);
@ -102,8 +105,7 @@ public void commitOffset(Map<String, ?> offset) {
}
xsOut.setProcessedLowWatermark(
lcrPosition.getRawPosition(),
XStreamOut.DEFAULT_MODE
);
XStreamOut.DEFAULT_MODE);
}
else if (scn != null) {
if (LOGGER.isDebugEnabled()) {
@ -111,8 +113,7 @@ else if (scn != null) {
}
xsOut.setProcessedLowWatermark(
convertScnToPosition(scn),
XStreamOut.DEFAULT_MODE
);
XStreamOut.DEFAULT_MODE);
}
else {
LOGGER.warn("Nothing in offsets could be recorded to Oracle");

View File

@ -26,6 +26,7 @@
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.NumberConversions;
import io.debezium.util.Strings;
import oracle.jdbc.OracleTypes;
import oracle.sql.BINARY_DOUBLE;
import oracle.sql.BINARY_FLOAT;
@ -55,8 +56,7 @@ public SchemaBuilder schemaBuilder(Column column) {
column.jdbcType(),
column.typeName(),
column.length(),
column.scale()
);
column.scale());
switch (column.jdbcType()) {
// Oracle's float is not float as in Java but a NUMERIC without scale
@ -112,7 +112,7 @@ else if (width < 19) {
@Override
public ValueConverter converter(Column column, Field fieldDefn) {
switch(column.jdbcType()) {
switch (column.jdbcType()) {
case Types.CHAR:
case Types.VARCHAR:
case Types.NCHAR:
@ -123,7 +123,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case OracleTypes.BINARY_DOUBLE:
return data -> convertDouble(column, fieldDefn, data);
case Types.NUMERIC:
return getNumericConverter(column, fieldDefn);
return getNumericConverter(column, fieldDefn);
case Types.FLOAT:
return data -> convertVariableScale(column, fieldDefn, data);
case OracleTypes.TIMESTAMPTZ:
@ -358,8 +358,8 @@ else if (data instanceof TIMESTAMPTZ) {
}
else if (data instanceof TIMESTAMPLTZ) {
// JDBC driver throws an exception
// final TIMESTAMPLTZ ts = (TIMESTAMPLTZ)data;
// data = ts.offsetDateTimeValue(connection.connection());
// final TIMESTAMPLTZ ts = (TIMESTAMPLTZ)data;
// data = ts.offsetDateTimeValue(connection.connection());
return null;
}
}

View File

@ -9,6 +9,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.util.Clock;
import oracle.streams.RowLCR;
/**

View File

@ -10,6 +10,7 @@
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.util.Clock;
import oracle.streams.ColumnValue;
import oracle.streams.RowLCR;
@ -32,11 +33,15 @@ public XStreamChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table,
@Override
protected Operation getOperation() {
switch(lcr.getCommandType()) {
case RowLCR.INSERT: return Operation.CREATE;
case RowLCR.DELETE: return Operation.DELETE;
case RowLCR.UPDATE: return Operation.UPDATE;
default: throw new IllegalArgumentException("Received event of unexpected command type: " + lcr);
switch (lcr.getCommandType()) {
case RowLCR.INSERT:
return Operation.CREATE;
case RowLCR.DELETE:
return Operation.DELETE;
case RowLCR.UPDATE:
return Operation.UPDATE;
default:
throw new IllegalArgumentException("Received event of unexpected command type: " + lcr);
}
}

View File

@ -5,6 +5,14 @@
*/
package io.debezium.connector.oracle.antlr;
import java.sql.Types;
import java.util.Arrays;
import java.util.Locale;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTree;
import io.debezium.antlr.AntlrDdlParser;
import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver;
@ -14,14 +22,8 @@
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.SystemVariables;
import io.debezium.relational.Tables;
import oracle.jdbc.OracleTypes;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTree;
import java.sql.Types;
import java.util.Arrays;
import java.util.Locale;
import oracle.jdbc.OracleTypes;
/**
* This is the main Oracle Antlr DDL parser
@ -104,8 +106,7 @@ protected DataTypeResolver initializeDataTypeResolver() {
new DataTypeEntry(Types.FLOAT, PlSqlParser.FLOAT),
new DataTypeEntry(Types.FLOAT, PlSqlParser.REAL),
new DataTypeEntry(Types.BLOB, PlSqlParser.BLOB),
new DataTypeEntry(Types.CLOB, PlSqlParser.CLOB)
));
new DataTypeEntry(Types.CLOB, PlSqlParser.CLOB)));
return dataTypeResolverBuilder.build();
}

View File

@ -5,6 +5,13 @@
*/
package io.debezium.connector.oracle.antlr.listener;
import static io.debezium.antlr.AntlrDdlParser.getText;
import java.util.ArrayList;
import java.util.List;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
@ -12,12 +19,6 @@
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.text.ParsingException;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import java.util.ArrayList;
import java.util.List;
import static io.debezium.antlr.AntlrDdlParser.getText;
/**
* Parser listener that is parsing Oracle ALTER TABLE statements
@ -67,7 +68,7 @@ public void exitAlter_table(PlSqlParser.Alter_tableContext ctx) {
parser.runIfNotNull(() -> {
listeners.remove(columnDefinitionParserListener);
parser.databaseTables().overwriteTable(tableEditor.create());
//parser.signalAlterTable(tableEditor.tableId(), null, ctx.getParent());// todo?
// parser.signalAlterTable(tableEditor.tableId(), null, ctx.getParent());// todo?
}, tableEditor);
super.exitAlter_table(ctx);
}
@ -88,7 +89,6 @@ public void enterAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) {
super.enterAdd_column_clause(ctx);
}
@Override
public void exitAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) {
parser.runIfNotNull(() -> {

View File

@ -5,14 +5,15 @@
*/
package io.debezium.connector.oracle.antlr.listener;
import java.sql.Types;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import oracle.jdbc.OracleTypes;
import java.sql.Types;
import oracle.jdbc.OracleTypes;
/**
* This class parses column definitions of Oracle statements.
@ -24,7 +25,7 @@ public class ColumnDefinitionParserListener extends BaseParserListener {
private ColumnEditor columnEditor;
ColumnDefinitionParserListener(final TableEditor tableEditor, final ColumnEditor columnEditor,
final DataTypeResolver dataTypeResolver) {
final DataTypeResolver dataTypeResolver) {
this.dataTypeResolver = dataTypeResolver;
this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
@ -56,197 +57,197 @@ public void enterPrimary_key_clause(PlSqlParser.Primary_key_clauseContext ctx) {
// todo use dataTypeResolver instead
private void resolveColumnDataType(PlSqlParser.Column_definitionContext ctx) {
PlSqlParser.Precision_partContext precisionPart = ctx.datatype().precision_part();
PlSqlParser.Precision_partContext precisionPart = ctx.datatype().precision_part();
columnEditor.name(getColumnName(ctx.column_name()));
columnEditor.name(getColumnName(ctx.column_name()));
if (ctx.datatype().native_datatype_element() != null) {
if (ctx.datatype().native_datatype_element().INT() != null
|| ctx.datatype().native_datatype_element().INTEGER() != null
|| ctx.datatype().native_datatype_element().SMALLINT() != null
|| ctx.datatype().native_datatype_element().NUMERIC() != null
|| ctx.datatype().native_datatype_element().DECIMAL() != null) {
// NUMERIC and DECIMAL types have by default zero scale
columnEditor
.jdbcType(Types.NUMERIC)
.type("NUMBER");
if (ctx.datatype().native_datatype_element() != null) {
if (ctx.datatype().native_datatype_element().INT() != null
|| ctx.datatype().native_datatype_element().INTEGER() != null
|| ctx.datatype().native_datatype_element().SMALLINT() != null
|| ctx.datatype().native_datatype_element().NUMERIC() != null
|| ctx.datatype().native_datatype_element().DECIMAL() != null) {
// NUMERIC and DECIMAL types have by default zero scale
columnEditor
.jdbcType(Types.NUMERIC)
.type("NUMBER");
if (precisionPart == null) {
columnEditor.length(38)
.scale(0);
}
else {
setPrecision(precisionPart, columnEditor);
setScale(precisionPart, columnEditor);
}
if (precisionPart == null) {
columnEditor.length(38)
.scale(0);
}
else if (ctx.datatype().native_datatype_element().DATE() != null) {
// JDBC driver reports type as timestamp but name DATE
columnEditor
.jdbcType(Types.TIMESTAMP)
.type("DATE");
else {
setPrecision(precisionPart, columnEditor);
setScale(precisionPart, columnEditor);
}
else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) {
if (ctx.datatype().WITH() != null
&& ctx.datatype().TIME() != null
&& ctx.datatype().ZONE() != null) {
if (ctx.datatype().LOCAL() != null) {
columnEditor
.jdbcType(OracleTypes.TIMESTAMPLTZ)
.type("TIMESTAMP WITH LOCAL TIME ZONE");
}
else {
columnEditor
.jdbcType(OracleTypes.TIMESTAMPTZ)
.type("TIMESTAMP WITH TIME ZONE");
}
}
else if (ctx.datatype().native_datatype_element().DATE() != null) {
// JDBC driver reports type as timestamp but name DATE
columnEditor
.jdbcType(Types.TIMESTAMP)
.type("DATE");
}
else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) {
if (ctx.datatype().WITH() != null
&& ctx.datatype().TIME() != null
&& ctx.datatype().ZONE() != null) {
if (ctx.datatype().LOCAL() != null) {
columnEditor
.jdbcType(OracleTypes.TIMESTAMPLTZ)
.type("TIMESTAMP WITH LOCAL TIME ZONE");
}
else {
columnEditor
.jdbcType(Types.TIMESTAMP)
.type("TIMESTAMP");
.jdbcType(OracleTypes.TIMESTAMPTZ)
.type("TIMESTAMP WITH TIME ZONE");
}
if (precisionPart == null) {
columnEditor.length(6);
}
else {
setPrecision(precisionPart, columnEditor);
}
}
// VARCHAR is the same as VARCHAR2 in Oracle
else if (ctx.datatype().native_datatype_element().VARCHAR2() != null ||
ctx.datatype().native_datatype_element().VARCHAR() != null) {
columnEditor
.jdbcType(Types.VARCHAR)
.type("VARCHAR2");
if (precisionPart == null) {
columnEditor.length(getVarCharDefaultLength());
}
else {
setPrecision(precisionPart, columnEditor);
}
}
else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) {
columnEditor
.jdbcType(Types.NVARCHAR)
.type("NVARCHAR2");
if (precisionPart == null) {
columnEditor.length(getVarCharDefaultLength());
}
else {
setPrecision(precisionPart, columnEditor);
}
}
else if (ctx.datatype().native_datatype_element().CHAR() != null) {
columnEditor
.jdbcType(Types.CHAR)
.type("CHAR")
.length(1);
}
else if (ctx.datatype().native_datatype_element().NCHAR() != null) {
columnEditor
.jdbcType(Types.NCHAR)
.type("NCHAR")
.length(1);
}
else if (ctx.datatype().native_datatype_element().BINARY_FLOAT() != null) {
columnEditor
.jdbcType(OracleTypes.BINARY_FLOAT)
.type("BINARY_FLOAT");
}
else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) {
columnEditor
.jdbcType(OracleTypes.BINARY_DOUBLE)
.type("BINARY_DOUBLE");
}
// PRECISION keyword is mandatory
else if (ctx.datatype().native_datatype_element().FLOAT() != null ||
(ctx.datatype().native_datatype_element().DOUBLE() != null && ctx.datatype().native_datatype_element().PRECISION() != null)) {
columnEditor
.jdbcType(Types.FLOAT)
.type("FLOAT")
.length(126);
// TODO float's precision is about bits not decimal digits; should be ok for now to over-size
if (precisionPart != null) {
setPrecision(precisionPart, columnEditor);
}
}
else if (ctx.datatype().native_datatype_element().REAL() != null) {
columnEditor
.jdbcType(Types.FLOAT)
.type("FLOAT")
// TODO float's precision is about bits not decimal digits; should be ok for now to over-size
.length(63);
}
else if (ctx.datatype().native_datatype_element().NUMBER() != null) {
columnEditor
.jdbcType(Types.NUMERIC)
.type("NUMBER");
if (precisionPart == null) {
columnEditor.length(38);
}
else {
setPrecision(precisionPart, columnEditor);
setScale(precisionPart, columnEditor);
}
}
else if (ctx.datatype().native_datatype_element().BLOB() != null) {
columnEditor
.jdbcType(Types.BLOB)
.type("BLOB");
}
else if (ctx.datatype().native_datatype_element().CLOB() != null) {
columnEditor
.jdbcType(Types.CLOB)
.type("CLOB");
}
else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText());
columnEditor
.jdbcType(Types.TIMESTAMP)
.type("TIMESTAMP");
}
if (precisionPart == null) {
columnEditor.length(6);
}
else {
setPrecision(precisionPart, columnEditor);
}
}
else if (ctx.datatype().INTERVAL() != null
&& ctx.datatype().YEAR() != null
&& ctx.datatype().TO() != null
&& ctx.datatype().MONTH() != null) {
// VARCHAR is the same as VARCHAR2 in Oracle
else if (ctx.datatype().native_datatype_element().VARCHAR2() != null ||
ctx.datatype().native_datatype_element().VARCHAR() != null) {
columnEditor
.jdbcType(OracleTypes.INTERVALYM)
.type("INTERVAL YEAR TO MONTH")
.length(2);
if (!ctx.datatype().expression().isEmpty()) {
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText())));
.jdbcType(Types.VARCHAR)
.type("VARCHAR2");
if (precisionPart == null) {
columnEditor.length(getVarCharDefaultLength());
}
else {
setPrecision(precisionPart, columnEditor);
}
}
else if (ctx.datatype().INTERVAL() != null
&& ctx.datatype().DAY() != null
&& ctx.datatype().TO() != null
&& ctx.datatype().SECOND() != null) {
else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) {
columnEditor
.jdbcType(OracleTypes.INTERVALDS)
.type("INTERVAL DAY TO SECOND")
.length(2)
.scale(6);
for (final PlSqlParser.ExpressionContext e : ctx.datatype().expression()) {
if (e.getSourceInterval().startsAfter(ctx.datatype().TO().getSourceInterval())) {
columnEditor.scale(Integer.valueOf(e.getText()));
}
else {
columnEditor.length(Integer.valueOf(e.getText()));
}
.jdbcType(Types.NVARCHAR)
.type("NVARCHAR2");
if (precisionPart == null) {
columnEditor.length(getVarCharDefaultLength());
}
if (!ctx.datatype().expression().isEmpty()) {
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText())));
else {
setPrecision(precisionPart, columnEditor);
}
}
else if (ctx.datatype().native_datatype_element().CHAR() != null) {
columnEditor
.jdbcType(Types.CHAR)
.type("CHAR")
.length(1);
}
else if (ctx.datatype().native_datatype_element().NCHAR() != null) {
columnEditor
.jdbcType(Types.NCHAR)
.type("NCHAR")
.length(1);
}
else if (ctx.datatype().native_datatype_element().BINARY_FLOAT() != null) {
columnEditor
.jdbcType(OracleTypes.BINARY_FLOAT)
.type("BINARY_FLOAT");
}
else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) {
columnEditor
.jdbcType(OracleTypes.BINARY_DOUBLE)
.type("BINARY_DOUBLE");
}
// PRECISION keyword is mandatory
else if (ctx.datatype().native_datatype_element().FLOAT() != null ||
(ctx.datatype().native_datatype_element().DOUBLE() != null && ctx.datatype().native_datatype_element().PRECISION() != null)) {
columnEditor
.jdbcType(Types.FLOAT)
.type("FLOAT")
.length(126);
// TODO float's precision is about bits not decimal digits; should be ok for now to over-size
if (precisionPart != null) {
setPrecision(precisionPart, columnEditor);
}
}
else if (ctx.datatype().native_datatype_element().REAL() != null) {
columnEditor
.jdbcType(Types.FLOAT)
.type("FLOAT")
// TODO float's precision is about bits not decimal digits; should be ok for now to over-size
.length(63);
}
else if (ctx.datatype().native_datatype_element().NUMBER() != null) {
columnEditor
.jdbcType(Types.NUMERIC)
.type("NUMBER");
if (precisionPart == null) {
columnEditor.length(38);
}
else {
setPrecision(precisionPart, columnEditor);
setScale(precisionPart, columnEditor);
}
}
else if (ctx.datatype().native_datatype_element().BLOB() != null) {
columnEditor
.jdbcType(Types.BLOB)
.type("BLOB");
}
else if (ctx.datatype().native_datatype_element().CLOB() != null) {
columnEditor
.jdbcType(Types.CLOB)
.type("CLOB");
}
else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().getText());
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText());
}
}
else if (ctx.datatype().INTERVAL() != null
&& ctx.datatype().YEAR() != null
&& ctx.datatype().TO() != null
&& ctx.datatype().MONTH() != null) {
columnEditor
.jdbcType(OracleTypes.INTERVALYM)
.type("INTERVAL YEAR TO MONTH")
.length(2);
if (!ctx.datatype().expression().isEmpty()) {
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText())));
}
}
else if (ctx.datatype().INTERVAL() != null
&& ctx.datatype().DAY() != null
&& ctx.datatype().TO() != null
&& ctx.datatype().SECOND() != null) {
columnEditor
.jdbcType(OracleTypes.INTERVALDS)
.type("INTERVAL DAY TO SECOND")
.length(2)
.scale(6);
for (final PlSqlParser.ExpressionContext e : ctx.datatype().expression()) {
if (e.getSourceInterval().startsAfter(ctx.datatype().TO().getSourceInterval())) {
columnEditor.scale(Integer.valueOf(e.getText()));
}
else {
columnEditor.length(Integer.valueOf(e.getText()));
}
}
if (!ctx.datatype().expression().isEmpty()) {
columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText())));
}
}
else {
throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().getText());
}
boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null);
boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null);
// todo move to enterExpression and apply type conversion
if (ctx.DEFAULT() != null) {
@ -255,10 +256,10 @@ else if (ctx.datatype().INTERVAL() != null
}
// todo move to nonNull
columnEditor.optional(!hasNotNullConstraint);
}
}
private int getVarCharDefaultLength() {
// TODO replace with value from select name, value from v$parameter where name='max_string_size';
// TODO replace with value from select name, value from v$parameter where name='max_string_size';
return 4000;
}

View File

@ -28,7 +28,7 @@ public class CreateTableParserListener extends BaseParserListener {
private ColumnDefinitionParserListener columnDefinitionParserListener;
CreateTableParserListener(final String catalogName, final String schemaName, final OracleDdlParser parser,
final List<ParseTreeListener> listeners) {
final List<ParseTreeListener> listeners) {
this.catalogName = catalogName;
this.schemaName = schemaName;
this.parser = parser;
@ -53,7 +53,7 @@ public void exitCreate_table(PlSqlParser.Create_tableContext ctx) {
listeners.remove(columnDefinitionParserListener);
columnDefinitionParserListener = null;
parser.databaseTables().overwriteTable(table);
//parser.signalCreateTable(tableEditor.tableId(), ctx); todo ?
// parser.signalCreateTable(tableEditor.tableId(), ctx); todo ?
}, tableEditor, table);
super.exitCreate_table(ctx);
}

View File

@ -5,18 +5,19 @@
*/
package io.debezium.connector.oracle.antlr.listener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.ProxyParseTreeListenerUtil;
import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
import io.debezium.text.ParsingException;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* This class is Oracle main DDL parser listener class.

View File

@ -110,21 +110,24 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
new SchemaAndValueField("VAL_VARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"),
new SchemaAndValueField("VAL_NVARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "nv\u010d2"),
new SchemaAndValueField("VAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "c "),
new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d ")
);
new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d "));
private static final List<SchemaAndValueField> EXPECTED_FP = Arrays.asList(
new SchemaAndValueField("VAL_BF", Schema.OPTIONAL_FLOAT32_SCHEMA, 1.1f),
new SchemaAndValueField("VAL_BD", Schema.OPTIONAL_FLOAT64_SCHEMA, 2.22),
new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("3.33"))),
new SchemaAndValueField("VAL_F_10", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("8.888"))),
new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().build(),
VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("3.33"))),
new SchemaAndValueField("VAL_F_10", VariableScaleDecimal.builder().optional().build(),
VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("8.888"))),
new SchemaAndValueField("VAL_NUM", Decimal.builder(6).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("4.444400")),
new SchemaAndValueField("VAL_DP", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("5.555"))),
new SchemaAndValueField("VAL_R", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("6.66"))),
new SchemaAndValueField("VAL_DP", VariableScaleDecimal.builder().optional().build(),
VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("5.555"))),
new SchemaAndValueField("VAL_R", VariableScaleDecimal.builder().optional().build(),
VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("6.66"))),
new SchemaAndValueField("VAL_DECIMAL", Decimal.builder(6).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("1234.567891")),
new SchemaAndValueField("VAL_NUMERIC", Decimal.builder(6).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("1234.567891")),
new SchemaAndValueField("VAL_NUM_VS", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("77.323")))
);
new SchemaAndValueField("VAL_NUM_VS", VariableScaleDecimal.builder().optional().build(),
VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("77.323"))));
private static final List<SchemaAndValueField> EXPECTED_INT = Arrays.asList(
new SchemaAndValueField("VAL_INT", NUMBER_SCHEMA, new BigDecimal("1")),
@ -141,18 +144,20 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
new SchemaAndValueField("VAL_NUMBER_9_NEGATIVE_SCALE", Schema.OPTIONAL_INT32_SCHEMA, 9999_99990),
new SchemaAndValueField("VAL_NUMBER_18_NEGATIVE_SCALE", Schema.OPTIONAL_INT64_SCHEMA, 999_99999_99999_99900L),
new SchemaAndValueField("VAL_DECIMAL", Schema.OPTIONAL_INT64_SCHEMA, 99999_99999L),
new SchemaAndValueField("VAL_NUMERIC", Schema.OPTIONAL_INT64_SCHEMA, 99999_99999L)
);
new SchemaAndValueField("VAL_NUMERIC", Schema.OPTIONAL_INT64_SCHEMA, 99999_99999L));
private static final List<SchemaAndValueField> EXPECTED_TIME = Arrays.asList(
new SchemaAndValueField("VAL_DATE", Timestamp.builder().optional().build(), 1522108800_000l),
new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890),
new SchemaAndValueField("VAL_TS_PRECISION2", Timestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000 + 130),
new SchemaAndValueField("VAL_TS_PRECISION4", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 125500),
new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(),
LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890),
new SchemaAndValueField("VAL_TS_PRECISION2", Timestamp.builder().optional().build(),
LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000 + 130),
new SchemaAndValueField("VAL_TS_PRECISION4", MicroTimestamp.builder().optional().build(),
LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 125500),
new SchemaAndValueField("VAL_TSTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00"),
new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000L),
new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000L)
// new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00")
// new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00")
);
private static final String[] ALL_TABLES = {
@ -174,14 +179,14 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest
@BeforeClass
public static void dropTables() throws SQLException {
connection = TestHelper.testConnection();
for (String table: ALL_TABLES) {
for (String table : ALL_TABLES) {
TestHelper.dropTable(connection, table);
}
}
protected static void createTables() throws SQLException {
connection.execute(ALL_DDLS);
for (String table: ALL_TABLES) {
for (String table : ALL_TABLES) {
streamTable(table);
}
}
@ -193,7 +198,7 @@ protected List<String> getAllTables() {
protected abstract boolean insertRecordsDuringTest();
private static void streamTable(String table) throws SQLException {
connection.execute("GRANT SELECT ON " + table + " to " + TestHelper.CONNECTOR_USER);
connection.execute("GRANT SELECT ON " + table + " to " + TestHelper.CONNECTOR_USER);
connection.execute("ALTER TABLE " + table + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
}
@ -339,7 +344,8 @@ protected static void insertFpTypes() throws SQLException {
}
protected static void insertIntTypes() throws SQLException {
connection.execute("INSERT INTO debezium.type_int VALUES (1, 1, 22, 333, 4444, 5555, 99, 9999, 999999999, 999999999999999999, 94, 9949, 999999994, 999999999999999949, 9999999999, 9999999999)");
connection.execute(
"INSERT INTO debezium.type_int VALUES (1, 1, 22, 333, 4444, 5555, 99, 9999, 999999999, 999999999999999999, 94, 9949, 999999994, 999999999999999949, 9999999999, 9999999999)");
connection.execute("COMMIT");
}

View File

@ -74,8 +74,7 @@ public void before() throws SQLException {
"GRANT ALL PRIVILEGES ON debezium2.table2 to debezium",
"GRANT SELECT ON debezium2.table2 to " + TestHelper.CONNECTOR_USER,
"GRANT SELECT ON debezium2.nopk to " + TestHelper.CONNECTOR_USER,
"ALTER TABLE debezium2.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"
);
"ALTER TABLE debezium2.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
String ddl = "create table debezium.table1 (" +
" id numeric(9,0) not null, " +
" name varchar2(1000), " +

View File

@ -525,7 +525,6 @@ public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Excepti
connection.execute("INSERT INTO debezium.customer2 VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))");
connection.execute("COMMIT");
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER2");

View File

@ -23,7 +23,6 @@
import io.debezium.relational.Tables;
import io.debezium.util.IoUtil;
/**
* This is the test suite for Oracle Antlr parser unit testing
*/
@ -66,20 +65,21 @@ public void shouldParseCreateAndAlterTable() throws Exception {
// float(126)
testColumn(table, "COL6", true, Types.FLOAT, "FLOAT", 126, 0, true, null);
// date
testColumn(table, "COL7", true, Types.TIMESTAMP, "DATE", -1, null, true, null);
testColumn(table, "COL7", true, Types.TIMESTAMP, "DATE", -1, null, true, null);
// timestamp
testColumn(table, "COL8", true, Types.TIMESTAMP, "TIMESTAMP", 6, null, true, null);
// blob
testColumn(table, "COL9", true, Types.BLOB, "BLOB", -1, null, true, null);
testColumn(table, "COL9", true, Types.BLOB, "BLOB", -1, null, true, null);
// clob
testColumn(table, "COL10", true, Types.CLOB, "CLOB", -1, null, true, null);
testColumn(table, "COL10", true, Types.CLOB, "CLOB", -1, null, true, null);
// todo sdo_geometry
//testColumn(table, "col12", true, Types.STRUCT, "MDSYS.SDO_GEOMETRY", -1, null,true);
// testColumn(table, "col12", true, Types.STRUCT, "MDSYS.SDO_GEOMETRY", -1, null,true);
String ddl = "alter table " + TABLE_NAME + " add (col21 varchar2(20), col22 number(19));";
parser.parse(ddl, tables);
Table alteredTable = tables.forTable(new TableId(null, null, TABLE_NAME));
assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21", "COL22");
assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21",
"COL22");
// varchar2(255)
testColumn(alteredTable, "COL21", true, Types.VARCHAR, "VARCHAR2", 20, null, true, null);
testColumn(alteredTable, "COL22", true, Types.NUMERIC, "NUMBER", 19, 0, true, null);
@ -95,7 +95,8 @@ public void shouldParseCreateAndAlterTable() throws Exception {
ddl = "alter table " + TABLE_NAME + " add (col23 varchar2(20) not null);";
parser.parse(ddl, tables);
alteredTable = tables.forTable(new TableId(null, null, TABLE_NAME));
assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21", "COL22", "COL23");
assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21",
"COL22", "COL23");
testColumn(alteredTable, "COL23", false, Types.VARCHAR, "VARCHAR2", 20, null, false, null);
ddl = "alter table " + TABLE_NAME + " drop (col22, col23);";
@ -103,14 +104,14 @@ public void shouldParseCreateAndAlterTable() throws Exception {
alteredTable = tables.forTable(new TableId(null, null, TABLE_NAME));
assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21");
ddl = "drop table " + TABLE_NAME +";";
ddl = "drop table " + TABLE_NAME + ";";
parser.parse(ddl, tables);
assertThat(tables.size()).isZero();
// todo
ddl = "drop table " + TABLE_NAME + " cascade constrains purge;";
ddl = "ALTER TABLE "+TABLE_NAME+" ADD CONSTRAINT FKB97209E040C4205 FOREIGN KEY (col1) REFERENCES debezium_ref(ID);";
ddl = "ALTER TABLE "+TABLE_NAME+" MODIFY COL1 varchar2(50) not null;";
ddl = "ALTER TABLE " + TABLE_NAME + " ADD CONSTRAINT FKB97209E040C4205 FOREIGN KEY (col1) REFERENCES debezium_ref(ID);";
ddl = "ALTER TABLE " + TABLE_NAME + " MODIFY COL1 varchar2(50) not null;";
}
/**

View File

@ -24,7 +24,8 @@
*/
public class SnapshotDatatypesIT extends AbstractOracleDatatypesTest {
@Rule public TestName name = new TestName();
@Rule
public TestName name = new TestName();
@BeforeClass
public static void beforeClass() throws SQLException {
@ -54,7 +55,7 @@ public void before() throws Exception {
}
private String getTableWhitelist() {
switch(name.getMethodName()) {
switch (name.getMethodName()) {
case "stringTypes":
return "ORCLPDB1.debezium.type_string";
case "fpTypes":

View File

@ -5,6 +5,10 @@
*/
package io.debezium.connector.oracle;
import static org.fest.assertions.Assertions.assertThat;
import java.time.Instant;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Before;
@ -15,10 +19,6 @@
import io.debezium.data.VerifyRecord;
import io.debezium.relational.TableId;
import java.time.Instant;
import static org.fest.assertions.Assertions.assertThat;
public class SourceInfoTest {
private SourceInfo source;
@ -27,10 +27,9 @@ public class SourceInfoTest {
public void beforeEach() {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "serverX")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.build()
);
.with(OracleConnectorConfig.SERVER_NAME, "serverX")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.build());
source = new SourceInfo(connectorConfig);
source.setSourceTime(Instant.now());
source.setTableId(new TableId("c", "s", "t"));

View File

@ -30,9 +30,9 @@ public void before() throws Exception {
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
String whitelistedTables = getAllTables().stream()
.map(t -> "ORCLPDB1." + t)
.map(t -> t.replaceAll("\\.", "\\\\."))
.collect(Collectors.joining(","));
.map(t -> "ORCLPDB1." + t)
.map(t -> t.replaceAll("\\.", "\\\\."))
.collect(Collectors.joining(","));
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_WHITELIST, whitelistedTables)

View File

@ -47,8 +47,7 @@ public static Configuration.Builder defaultConfig() {
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach(
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)
);
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
return builder.with(RelationalDatabaseConnectorConfig.SERVER_NAME, "server1")
.with(OracleConnectorConfig.PDB_NAME, "ORCLPDB1")
@ -103,8 +102,7 @@ private static Configuration.Builder testConfig() {
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach(
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)
);
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
return builder;
}
@ -114,8 +112,7 @@ private static Configuration.Builder adminConfig() {
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach(
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)
);
(field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
return builder;
}