diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java index 3862037b0..f57a8ad1a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java @@ -13,6 +13,7 @@ import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.TableId; import io.debezium.util.Clock; + import oracle.streams.ChunkColumnValue; import oracle.streams.DDLLCR; import oracle.streams.LCR; @@ -36,7 +37,8 @@ class LcrEventHandler implements XStreamLCRCallbackHandler { private final OracleOffsetContext offsetContext; private final boolean tablenameCaseInsensitive; - public LcrEventHandler(ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, RelationalDatabaseSchema schema, OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive) { + public LcrEventHandler(ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, RelationalDatabaseSchema schema, + OracleOffsetContext offsetContext, boolean tablenameCaseInsensitive) { this.errorHandler = errorHandler; this.dispatcher = dispatcher; this.clock = clock; @@ -58,8 +60,7 @@ public void processLCR(LCR lcr) throws StreamsException { lcrPosition, lcrPosition.getScn(), recPosition != null ? recPosition : "none", - recPosition != null ? recPosition.getScn() : "none" - ); + recPosition != null ? recPosition.getScn() : "none"); } return; } @@ -100,8 +101,7 @@ private void dispatchDataChangeEvent(RowLCR lcr) throws InterruptedException { dispatcher.dispatchDataChangeEvent( tableId, - new XStreamChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock) - ); + new XStreamChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock)); } private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedException { @@ -113,8 +113,7 @@ private void dispatchSchemaChangeEvent(DDLLCR ddlLcr) throws InterruptedExceptio dispatcher.dispatchSchemaChangeEvent( tableId, - new OracleSchemaChangeEventEmitter(offsetContext, tableId, ddlLcr) - ); + new OracleSchemaChangeEventEmitter(offsetContext, tableId, ddlLcr)); } private TableId getTableId(LCR lcr) { @@ -140,4 +139,4 @@ public LCR createLCR() throws StreamsException { public ChunkColumnValue createChunk() throws StreamsException { throw new UnsupportedOperationException("Should never be called"); } -} \ No newline at end of file +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrPosition.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrPosition.java index a5aeda32e..53b255561 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrPosition.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrPosition.java @@ -13,6 +13,7 @@ import io.debezium.util.HexConverter; import io.debezium.util.Strings; + import oracle.streams.StreamsException; import oracle.streams.XStreamUtility; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java index 2820dfc67..4d229d5e3 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java @@ -25,7 +25,7 @@ public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory private final OracleDatabaseSchema schema; public OracleChangeEventSourceFactory(OracleConnectorConfig configuration, OracleConnection jdbcConnection, - ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, OracleDatabaseSchema schema) { + ErrorHandler errorHandler, EventDispatcher dispatcher, Clock clock, OracleDatabaseSchema schema) { this.configuration = configuration; this.jdbcConnection = jdbcConnection; this.errorHandler = errorHandler; @@ -49,7 +49,6 @@ public StreamingChangeEventSource getStreamingChangeEventSource(OffsetContext of dispatcher, errorHandler, clock, - schema - ); + schema); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index bf4bf46ef..e7cb88ab9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -24,6 +24,7 @@ import io.debezium.relational.Tables; import io.debezium.relational.Tables.ColumnNameFilter; import io.debezium.relational.Tables.TableFilter; + import oracle.jdbc.OracleTypes; public class OracleConnection extends JdbcConnection { @@ -85,7 +86,8 @@ public void resetSessionToCdb() { @Override public Set readTableNames(String databaseCatalog, String schemaNamePattern, String tableNamePattern, - String[] tableTypes) throws SQLException { + String[] tableTypes) + throws SQLException { Set tableIds = super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes); @@ -96,7 +98,8 @@ public Set readTableNames(String databaseCatalog, String schemaNamePatt @Override public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern, TableFilter tableFilter, - ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) throws SQLException { + ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) + throws SQLException { super.readSchema(tables, null, schemaNamePattern, null, columnFilter, removeTablesNotFoundInJdbc); @@ -117,23 +120,21 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP if (column.jdbcType() == Types.TIMESTAMP) { editor.addColumn( column.edit() - .length(column.scale().orElse(Column.UNSET_INT_VALUE)) - .scale(null) - .create() - ); + .length(column.scale().orElse(Column.UNSET_INT_VALUE)) + .scale(null) + .create()); } // NUMBER columns without scale value have it set to -127 instead of null; // let's rectify that else if (column.jdbcType() == OracleTypes.NUMBER) { column.scale() - .filter(s -> s == ORACLE_UNSET_SCALE) - .ifPresent(s -> { - editor.addColumn( - column.edit() - .scale(null) - .create() - ); - }); + .filter(s -> s == ORACLE_UNSET_SCALE) + .ifPresent(s -> { + editor.addColumn( + column.edit() + .scale(null) + .create()); + }); } } tables.overwriteTable(editor.create()); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectionFactory.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectionFactory.java index 42a69b211..34bbb2492 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectionFactory.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectionFactory.java @@ -23,7 +23,6 @@ public Connection connect(JdbcConfiguration config) throws SQLException { String password = config.getPassword(); return DriverManager.getConnection( - "jdbc:oracle:oci:@" + hostName + ":" + port + "/" + database, user, password - ); + "jdbc:oracle:oci:@" + hostName + ":" + port + "/" + database, user, password); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 18fbb3abb..d2819ff48 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -25,6 +25,7 @@ import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.relational.history.KafkaDatabaseHistory; + import oracle.streams.XStreamUtility; /** @@ -73,17 +74,17 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector + "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. "); public static final Field TABLENAME_CASE_INSENSITIVE = Field.create("database.tablename.case.insensitive") - .withDisplayName("Case insensitive table names") - .withType(Type.BOOLEAN) - .withDefault(false) - .withImportance(Importance.LOW) - .withDescription("Case insensitive table names; set to 'true' for Oracle 11g, 'false' (default) otherwise."); + .withDisplayName("Case insensitive table names") + .withType(Type.BOOLEAN) + .withDefault(false) + .withImportance(Importance.LOW) + .withDescription("Case insensitive table names; set to 'true' for Oracle 11g, 'false' (default) otherwise."); public static final Field ORACLE_VERSION = Field.create("database.oracle.version") - .withDisplayName("Oracle version, 11 or 12+") - .withEnum(OracleVersion.class, OracleVersion.V12Plus) - .withImportance(Importance.LOW) - .withDescription("For default Oracle 12+, use default pos_version value v2, for Oracle 11, use pos_version value v1."); + .withDisplayName("Oracle version, 11 or 12+") + .withEnum(OracleVersion.class, OracleVersion.V12Plus) + .withImportance(Importance.LOW) + .withDescription("For default Oracle 12+, use default pos_version value v2, for Oracle 11, use pos_version value v1."); public static final Field SERVER_NAME = RelationalDatabaseConnectorConfig.SERVER_NAME .withValidation(CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName); @@ -109,8 +110,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX, TABLENAME_CASE_INSENSITIVE, - ORACLE_VERSION - ); + ORACLE_VERSION); private final String databaseName; private final String pdbName; @@ -143,8 +143,7 @@ public static ConfigDef configDef() { RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN, - Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX - ); + Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX); Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.SNAPSHOT_DELAY_MS); @@ -167,7 +166,7 @@ public SnapshotMode getSnapshotMode() { return snapshotMode; } - public boolean getTablenameCaseInsensitive() { + public boolean getTablenameCaseInsensitive() { return tablenameCaseInsensitive; } @@ -208,7 +207,7 @@ public String getValue() { } public int getPosVersion() { - switch(version) { + switch (version) { case "11": return XStreamUtility.POS_VERSION_V1; case "12+": diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index e94736c46..16daccd18 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -36,7 +36,8 @@ public class OracleConnectorTask extends BaseSourceTask { private static final String CONTEXT_NAME = "oracle-connector-task"; private static enum State { - RUNNING, STOPPED; + RUNNING, + STOPPED; } private final AtomicReference state = new AtomicReference(State.STOPPED); @@ -99,8 +100,7 @@ public void start(Configuration config) { connectorConfig.getLogicalName(), new OracleChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema), dispatcher, - schema - ); + schema); coordinator.start(taskContext, this.queue, new OracleEventMetadataProvider()); } @@ -110,8 +110,8 @@ public List poll() throws InterruptedException { List records = queue.poll(); List sourceRecords = records.stream() - .map(DataChangeEvent::getRecord) - .collect(Collectors.toList()); + .map(DataChangeEvent::getRecord) + .collect(Collectors.toList()); if (!sourceRecords.isEmpty()) { this.lastOffset = sourceRecords.get(sourceRecords.size() - 1).sourceOffset(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java index 2e6a5a4a5..0ea3bc9b3 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java @@ -29,16 +29,16 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema { private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class); - public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, OracleConnection connection) { + public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, + OracleConnection connection) { super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, - new TableSchemaBuilder( - new OracleValueConverters(connection), - schemaNameAdjuster, - connectorConfig.getSourceInfoStructMaker().schema(), - connectorConfig.getSanitizeFieldNames()), - connectorConfig.getTablenameCaseInsensitive(), - connectorConfig.getKeyMapper() - ); + new TableSchemaBuilder( + new OracleValueConverters(connection), + schemaNameAdjuster, + connectorConfig.getSourceInfoStructMaker().schema(), + connectorConfig.getSanitizeFieldNames()), + connectorConfig.getTablenameCaseInsensitive(), + connectorConfig.getKeyMapper()); } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleEventMetadataProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleEventMetadataProvider.java index dfe7eff1a..ade082d71 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleEventMetadataProvider.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleEventMetadataProvider.java @@ -42,8 +42,7 @@ public Map getEventSourcePosition(DataCollectionId source, Offse } final Long scn = sourceInfo.getInt64(SourceInfo.SCN_KEY); return Collect.hashMapOf( - SourceInfo.SCN_KEY, scn == null ? "null" : Long.toString(scn) - ); + SourceInfo.SCN_KEY, scn == null ? "null" : Long.toString(scn)); } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java index aa66a8044..fb2676b18 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSchemaChangeEventEmitter.java @@ -7,16 +7,17 @@ import java.util.Set; -import io.debezium.connector.oracle.antlr.OracleDdlParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.oracle.antlr.OracleDdlParser; import io.debezium.pipeline.spi.SchemaChangeEventEmitter; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; + import oracle.streams.DDLLCR; /** @@ -59,11 +60,12 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException Table table = tables.forTable(tableId); - receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), ddlLcr.getDDLText(), table, eventType, false)); + receiver.schemaChangeEvent(new SchemaChangeEvent(offsetContext.getPartition(), offsetContext.getOffset(), ddlLcr.getSourceDatabaseName(), ddlLcr.getObjectOwner(), + ddlLcr.getDDLText(), table, eventType, false)); } private SchemaChangeEventType getSchemaChangeEventType() { - switch(ddlLcr.getCommandType()) { + switch (ddlLcr.getCommandType()) { case "CREATE TABLE": return SchemaChangeEventType.CREATE; case "ALTER TABLE": diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index 97f80fdaa..80992f9b6 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -40,7 +40,9 @@ public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEve private final OracleConnectorConfig connectorConfig; private final OracleConnection jdbcConnection; - public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, OracleDatabaseSchema schema, EventDispatcher dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) { + public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, + OracleDatabaseSchema schema, EventDispatcher dispatcher, Clock clock, + SnapshotProgressListener snapshotProgressListener) { super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener); this.connectorConfig = connectorConfig; @@ -71,13 +73,12 @@ protected SnapshotContext prepare(ChangeEventSourceContext context) throws Excep } return new OracleSnapshotContext( - connectorConfig.getPdbName() != null ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName() - ); + connectorConfig.getPdbName() != null ? connectorConfig.getPdbName() : connectorConfig.getDatabaseName()); } @Override protected Set getAllTableIds(SnapshotContext ctx) throws Exception { - return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[] {"TABLE"} ); + return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[]{ "TABLE" }); } @Override @@ -113,8 +114,7 @@ protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception { // we'd get a ORA-01466 when running the flashback query for doing the snapshot do { currentScn = getCurrentScn(ctx); - } - while (areSameTimestamp(latestTableDdlScn.orElse(null), currentScn)); + } while (areSameTimestamp(latestTableDdlScn.orElse(null), currentScn)); ctx.offset = OracleOffsetContext.create() .logicalName(connectorConfig) @@ -123,7 +123,7 @@ protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception { } private long getCurrentScn(SnapshotContext ctx) throws SQLException { - try(Statement statement = jdbcConnection.connection().createStatement(); + try (Statement statement = jdbcConnection.connection().createStatement(); ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE")) { if (!rs.next()) { @@ -142,8 +142,8 @@ private boolean areSameTimestamp(Long scn1, long scn2) throws SQLException { return false; } - try(Statement statement = jdbcConnection.connection().createStatement(); - ResultSet rs = statement.executeQuery("SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ") = SCN_TO_TIMESTAMP(" + scn2 + ")" )) { + try (Statement statement = jdbcConnection.connection().createStatement(); + ResultSet rs = statement.executeQuery("SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ") = SCN_TO_TIMESTAMP(" + scn2 + ")")) { return rs.next(); } @@ -181,8 +181,8 @@ private Optional getLatestTableDdlScn(SnapshotContext ctx) throws SQLExcep @Override protected void readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { Set schemas = snapshotContext.capturedTables.stream() - .map(TableId::schema) - .collect(Collectors.toSet()); + .map(TableId::schema) + .collect(Collectors.toSet()); // reading info only for the schemas we're interested in as per the set of captured tables; // while the passed table name filter alone would skip all non-included tables, reading the schema @@ -198,15 +198,14 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Snapsh schema, connectorConfig.getTableFilters().dataCollectionFilter(), null, - false - ); + false); } } @Override protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext, Table table) throws SQLException { try (Statement statement = jdbcConnection.connection().createStatement(); - ResultSet rs = statement.executeQuery("select dbms_metadata.get_ddl( 'TABLE', '" + table.id().table() + "', '" + table.id().schema() + "' ) from dual")) { + ResultSet rs = statement.executeQuery("select dbms_metadata.get_ddl( 'TABLE', '" + table.id().table() + "', '" + table.id().schema() + "' ) from dual")) { if (!rs.next()) { throw new IllegalStateException("Couldn't get metadata"); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSource.java index 6bfde5c0d..8e0343374 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleStreamingChangeEventSource.java @@ -16,6 +16,7 @@ import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; import io.debezium.util.Clock; + import oracle.jdbc.OracleConnection; import oracle.sql.NUMBER; import oracle.streams.StreamsException; @@ -43,7 +44,8 @@ public class OracleStreamingChangeEventSource implements StreamingChangeEventSou private final boolean tablenameCaseInsensitive; private final int posVersion; - public OracleStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext, JdbcConnection jdbcConnection, EventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema) { + public OracleStreamingChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext offsetContext, JdbcConnection jdbcConnection, + EventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema) { this.jdbcConnection = jdbcConnection; this.dispatcher = dispatcher; this.errorHandler = errorHandler; @@ -59,7 +61,8 @@ public OracleStreamingChangeEventSource(OracleConnectorConfig connectorConfig, O public void execute(ChangeEventSourceContext context) throws InterruptedException { try { // 1. connect - final byte[] startPosition = offsetContext.getLcrPosition() != null ? offsetContext.getLcrPosition().getRawPosition() : convertScnToPosition(offsetContext.getScn()); + final byte[] startPosition = offsetContext.getLcrPosition() != null ? offsetContext.getLcrPosition().getRawPosition() + : convertScnToPosition(offsetContext.getScn()); xsOut = XStreamOut.attach((OracleConnection) jdbcConnection.connection(), xStreamServerName, startPosition, 1, 1, XStreamOut.DEFAULT_MODE); @@ -102,8 +105,7 @@ public void commitOffset(Map offset) { } xsOut.setProcessedLowWatermark( lcrPosition.getRawPosition(), - XStreamOut.DEFAULT_MODE - ); + XStreamOut.DEFAULT_MODE); } else if (scn != null) { if (LOGGER.isDebugEnabled()) { @@ -111,8 +113,7 @@ else if (scn != null) { } xsOut.setProcessedLowWatermark( convertScnToPosition(scn), - XStreamOut.DEFAULT_MODE - ); + XStreamOut.DEFAULT_MODE); } else { LOGGER.warn("Nothing in offsets could be recorded to Oracle"); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java index 1362f22be..25b377081 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleValueConverters.java @@ -26,6 +26,7 @@ import io.debezium.time.ZonedTimestamp; import io.debezium.util.NumberConversions; import io.debezium.util.Strings; + import oracle.jdbc.OracleTypes; import oracle.sql.BINARY_DOUBLE; import oracle.sql.BINARY_FLOAT; @@ -55,8 +56,7 @@ public SchemaBuilder schemaBuilder(Column column) { column.jdbcType(), column.typeName(), column.length(), - column.scale() - ); + column.scale()); switch (column.jdbcType()) { // Oracle's float is not float as in Java but a NUMERIC without scale @@ -112,7 +112,7 @@ else if (width < 19) { @Override public ValueConverter converter(Column column, Field fieldDefn) { - switch(column.jdbcType()) { + switch (column.jdbcType()) { case Types.CHAR: case Types.VARCHAR: case Types.NCHAR: @@ -123,7 +123,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case OracleTypes.BINARY_DOUBLE: return data -> convertDouble(column, fieldDefn, data); case Types.NUMERIC: - return getNumericConverter(column, fieldDefn); + return getNumericConverter(column, fieldDefn); case Types.FLOAT: return data -> convertVariableScale(column, fieldDefn, data); case OracleTypes.TIMESTAMPTZ: @@ -358,8 +358,8 @@ else if (data instanceof TIMESTAMPTZ) { } else if (data instanceof TIMESTAMPLTZ) { // JDBC driver throws an exception -// final TIMESTAMPLTZ ts = (TIMESTAMPLTZ)data; -// data = ts.offsetDateTimeValue(connection.connection()); + // final TIMESTAMPLTZ ts = (TIMESTAMPLTZ)data; + // data = ts.offsetDateTimeValue(connection.connection()); return null; } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java index b5c551f6e..704cb79aa 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java @@ -9,6 +9,7 @@ import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.RelationalChangeRecordEmitter; import io.debezium.util.Clock; + import oracle.streams.RowLCR; /** diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/XStreamChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/XStreamChangeRecordEmitter.java index e860d961d..d1d8f7c75 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/XStreamChangeRecordEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/XStreamChangeRecordEmitter.java @@ -10,6 +10,7 @@ import io.debezium.relational.RelationalChangeRecordEmitter; import io.debezium.relational.Table; import io.debezium.util.Clock; + import oracle.streams.ColumnValue; import oracle.streams.RowLCR; @@ -32,11 +33,15 @@ public XStreamChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table, @Override protected Operation getOperation() { - switch(lcr.getCommandType()) { - case RowLCR.INSERT: return Operation.CREATE; - case RowLCR.DELETE: return Operation.DELETE; - case RowLCR.UPDATE: return Operation.UPDATE; - default: throw new IllegalArgumentException("Received event of unexpected command type: " + lcr); + switch (lcr.getCommandType()) { + case RowLCR.INSERT: + return Operation.CREATE; + case RowLCR.DELETE: + return Operation.DELETE; + case RowLCR.UPDATE: + return Operation.UPDATE; + default: + throw new IllegalArgumentException("Received event of unexpected command type: " + lcr); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/OracleDdlParser.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/OracleDdlParser.java index cc8df18ee..e99f212e5 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/OracleDdlParser.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/OracleDdlParser.java @@ -5,6 +5,14 @@ */ package io.debezium.connector.oracle.antlr; +import java.sql.Types; +import java.util.Arrays; +import java.util.Locale; + +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.tree.ParseTree; + import io.debezium.antlr.AntlrDdlParser; import io.debezium.antlr.AntlrDdlParserListener; import io.debezium.antlr.DataTypeResolver; @@ -14,14 +22,8 @@ import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.relational.SystemVariables; import io.debezium.relational.Tables; -import oracle.jdbc.OracleTypes; -import org.antlr.v4.runtime.CharStream; -import org.antlr.v4.runtime.CommonTokenStream; -import org.antlr.v4.runtime.tree.ParseTree; -import java.sql.Types; -import java.util.Arrays; -import java.util.Locale; +import oracle.jdbc.OracleTypes; /** * This is the main Oracle Antlr DDL parser @@ -104,8 +106,7 @@ protected DataTypeResolver initializeDataTypeResolver() { new DataTypeEntry(Types.FLOAT, PlSqlParser.FLOAT), new DataTypeEntry(Types.FLOAT, PlSqlParser.REAL), new DataTypeEntry(Types.BLOB, PlSqlParser.BLOB), - new DataTypeEntry(Types.CLOB, PlSqlParser.CLOB) - )); + new DataTypeEntry(Types.CLOB, PlSqlParser.CLOB))); return dataTypeResolverBuilder.build(); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/AlterTableParserListener.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/AlterTableParserListener.java index 8eebddbed..606f3a61e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/AlterTableParserListener.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/AlterTableParserListener.java @@ -5,6 +5,13 @@ */ package io.debezium.connector.oracle.antlr.listener; +import static io.debezium.antlr.AntlrDdlParser.getText; + +import java.util.ArrayList; +import java.util.List; + +import org.antlr.v4.runtime.tree.ParseTreeListener; + import io.debezium.connector.oracle.antlr.OracleDdlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.relational.Column; @@ -12,12 +19,6 @@ import io.debezium.relational.TableEditor; import io.debezium.relational.TableId; import io.debezium.text.ParsingException; -import org.antlr.v4.runtime.tree.ParseTreeListener; - -import java.util.ArrayList; -import java.util.List; - -import static io.debezium.antlr.AntlrDdlParser.getText; /** * Parser listener that is parsing Oracle ALTER TABLE statements @@ -67,7 +68,7 @@ public void exitAlter_table(PlSqlParser.Alter_tableContext ctx) { parser.runIfNotNull(() -> { listeners.remove(columnDefinitionParserListener); parser.databaseTables().overwriteTable(tableEditor.create()); - //parser.signalAlterTable(tableEditor.tableId(), null, ctx.getParent());// todo? + // parser.signalAlterTable(tableEditor.tableId(), null, ctx.getParent());// todo? }, tableEditor); super.exitAlter_table(ctx); } @@ -88,7 +89,6 @@ public void enterAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) { super.enterAdd_column_clause(ctx); } - @Override public void exitAdd_column_clause(PlSqlParser.Add_column_clauseContext ctx) { parser.runIfNotNull(() -> { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java index 71412e99b..94bd2224b 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/ColumnDefinitionParserListener.java @@ -5,14 +5,15 @@ */ package io.debezium.connector.oracle.antlr.listener; +import java.sql.Types; + import io.debezium.antlr.DataTypeResolver; import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; import io.debezium.relational.TableEditor; -import oracle.jdbc.OracleTypes; -import java.sql.Types; +import oracle.jdbc.OracleTypes; /** * This class parses column definitions of Oracle statements. @@ -24,7 +25,7 @@ public class ColumnDefinitionParserListener extends BaseParserListener { private ColumnEditor columnEditor; ColumnDefinitionParserListener(final TableEditor tableEditor, final ColumnEditor columnEditor, - final DataTypeResolver dataTypeResolver) { + final DataTypeResolver dataTypeResolver) { this.dataTypeResolver = dataTypeResolver; this.tableEditor = tableEditor; this.columnEditor = columnEditor; @@ -56,197 +57,197 @@ public void enterPrimary_key_clause(PlSqlParser.Primary_key_clauseContext ctx) { // todo use dataTypeResolver instead private void resolveColumnDataType(PlSqlParser.Column_definitionContext ctx) { - PlSqlParser.Precision_partContext precisionPart = ctx.datatype().precision_part(); + PlSqlParser.Precision_partContext precisionPart = ctx.datatype().precision_part(); - columnEditor.name(getColumnName(ctx.column_name())); + columnEditor.name(getColumnName(ctx.column_name())); - if (ctx.datatype().native_datatype_element() != null) { - if (ctx.datatype().native_datatype_element().INT() != null - || ctx.datatype().native_datatype_element().INTEGER() != null - || ctx.datatype().native_datatype_element().SMALLINT() != null - || ctx.datatype().native_datatype_element().NUMERIC() != null - || ctx.datatype().native_datatype_element().DECIMAL() != null) { - // NUMERIC and DECIMAL types have by default zero scale - columnEditor - .jdbcType(Types.NUMERIC) - .type("NUMBER"); + if (ctx.datatype().native_datatype_element() != null) { + if (ctx.datatype().native_datatype_element().INT() != null + || ctx.datatype().native_datatype_element().INTEGER() != null + || ctx.datatype().native_datatype_element().SMALLINT() != null + || ctx.datatype().native_datatype_element().NUMERIC() != null + || ctx.datatype().native_datatype_element().DECIMAL() != null) { + // NUMERIC and DECIMAL types have by default zero scale + columnEditor + .jdbcType(Types.NUMERIC) + .type("NUMBER"); - if (precisionPart == null) { - columnEditor.length(38) - .scale(0); - } - else { - setPrecision(precisionPart, columnEditor); - setScale(precisionPart, columnEditor); - } + if (precisionPart == null) { + columnEditor.length(38) + .scale(0); } - else if (ctx.datatype().native_datatype_element().DATE() != null) { - // JDBC driver reports type as timestamp but name DATE - columnEditor - .jdbcType(Types.TIMESTAMP) - .type("DATE"); + else { + setPrecision(precisionPart, columnEditor); + setScale(precisionPart, columnEditor); } - else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) { - if (ctx.datatype().WITH() != null - && ctx.datatype().TIME() != null - && ctx.datatype().ZONE() != null) { - if (ctx.datatype().LOCAL() != null) { - columnEditor - .jdbcType(OracleTypes.TIMESTAMPLTZ) - .type("TIMESTAMP WITH LOCAL TIME ZONE"); - } - else { - columnEditor - .jdbcType(OracleTypes.TIMESTAMPTZ) - .type("TIMESTAMP WITH TIME ZONE"); - } + } + else if (ctx.datatype().native_datatype_element().DATE() != null) { + // JDBC driver reports type as timestamp but name DATE + columnEditor + .jdbcType(Types.TIMESTAMP) + .type("DATE"); + } + else if (ctx.datatype().native_datatype_element().TIMESTAMP() != null) { + if (ctx.datatype().WITH() != null + && ctx.datatype().TIME() != null + && ctx.datatype().ZONE() != null) { + if (ctx.datatype().LOCAL() != null) { + columnEditor + .jdbcType(OracleTypes.TIMESTAMPLTZ) + .type("TIMESTAMP WITH LOCAL TIME ZONE"); } else { columnEditor - .jdbcType(Types.TIMESTAMP) - .type("TIMESTAMP"); + .jdbcType(OracleTypes.TIMESTAMPTZ) + .type("TIMESTAMP WITH TIME ZONE"); } - - if (precisionPart == null) { - columnEditor.length(6); - } - else { - setPrecision(precisionPart, columnEditor); - } - } - // VARCHAR is the same as VARCHAR2 in Oracle - else if (ctx.datatype().native_datatype_element().VARCHAR2() != null || - ctx.datatype().native_datatype_element().VARCHAR() != null) { - columnEditor - .jdbcType(Types.VARCHAR) - .type("VARCHAR2"); - - if (precisionPart == null) { - columnEditor.length(getVarCharDefaultLength()); - } - else { - setPrecision(precisionPart, columnEditor); - } - } - else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) { - columnEditor - .jdbcType(Types.NVARCHAR) - .type("NVARCHAR2"); - - if (precisionPart == null) { - columnEditor.length(getVarCharDefaultLength()); - } - else { - setPrecision(precisionPart, columnEditor); - } - } - else if (ctx.datatype().native_datatype_element().CHAR() != null) { - columnEditor - .jdbcType(Types.CHAR) - .type("CHAR") - .length(1); - } - else if (ctx.datatype().native_datatype_element().NCHAR() != null) { - columnEditor - .jdbcType(Types.NCHAR) - .type("NCHAR") - .length(1); - } - else if (ctx.datatype().native_datatype_element().BINARY_FLOAT() != null) { - columnEditor - .jdbcType(OracleTypes.BINARY_FLOAT) - .type("BINARY_FLOAT"); - } - else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) { - columnEditor - .jdbcType(OracleTypes.BINARY_DOUBLE) - .type("BINARY_DOUBLE"); - } - // PRECISION keyword is mandatory - else if (ctx.datatype().native_datatype_element().FLOAT() != null || - (ctx.datatype().native_datatype_element().DOUBLE() != null && ctx.datatype().native_datatype_element().PRECISION() != null)) { - columnEditor - .jdbcType(Types.FLOAT) - .type("FLOAT") - .length(126); - - // TODO float's precision is about bits not decimal digits; should be ok for now to over-size - if (precisionPart != null) { - setPrecision(precisionPart, columnEditor); - } - } - else if (ctx.datatype().native_datatype_element().REAL() != null) { - columnEditor - .jdbcType(Types.FLOAT) - .type("FLOAT") - // TODO float's precision is about bits not decimal digits; should be ok for now to over-size - .length(63); - } - else if (ctx.datatype().native_datatype_element().NUMBER() != null) { - columnEditor - .jdbcType(Types.NUMERIC) - .type("NUMBER"); - - if (precisionPart == null) { - columnEditor.length(38); - } - else { - setPrecision(precisionPart, columnEditor); - setScale(precisionPart, columnEditor); - } - } - else if (ctx.datatype().native_datatype_element().BLOB() != null) { - columnEditor - .jdbcType(Types.BLOB) - .type("BLOB"); - } - else if (ctx.datatype().native_datatype_element().CLOB() != null) { - columnEditor - .jdbcType(Types.CLOB) - .type("CLOB"); } else { - throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText()); + columnEditor + .jdbcType(Types.TIMESTAMP) + .type("TIMESTAMP"); + } + + if (precisionPart == null) { + columnEditor.length(6); + } + else { + setPrecision(precisionPart, columnEditor); } } - else if (ctx.datatype().INTERVAL() != null - && ctx.datatype().YEAR() != null - && ctx.datatype().TO() != null - && ctx.datatype().MONTH() != null) { + // VARCHAR is the same as VARCHAR2 in Oracle + else if (ctx.datatype().native_datatype_element().VARCHAR2() != null || + ctx.datatype().native_datatype_element().VARCHAR() != null) { columnEditor - .jdbcType(OracleTypes.INTERVALYM) - .type("INTERVAL YEAR TO MONTH") - .length(2); - if (!ctx.datatype().expression().isEmpty()) { - columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); + .jdbcType(Types.VARCHAR) + .type("VARCHAR2"); + + if (precisionPart == null) { + columnEditor.length(getVarCharDefaultLength()); + } + else { + setPrecision(precisionPart, columnEditor); } } - else if (ctx.datatype().INTERVAL() != null - && ctx.datatype().DAY() != null - && ctx.datatype().TO() != null - && ctx.datatype().SECOND() != null) { + else if (ctx.datatype().native_datatype_element().NVARCHAR2() != null) { columnEditor - .jdbcType(OracleTypes.INTERVALDS) - .type("INTERVAL DAY TO SECOND") - .length(2) - .scale(6); - for (final PlSqlParser.ExpressionContext e : ctx.datatype().expression()) { - if (e.getSourceInterval().startsAfter(ctx.datatype().TO().getSourceInterval())) { - columnEditor.scale(Integer.valueOf(e.getText())); - } - else { - columnEditor.length(Integer.valueOf(e.getText())); - } + .jdbcType(Types.NVARCHAR) + .type("NVARCHAR2"); + + if (precisionPart == null) { + columnEditor.length(getVarCharDefaultLength()); } - if (!ctx.datatype().expression().isEmpty()) { - columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); + else { + setPrecision(precisionPart, columnEditor); } } + else if (ctx.datatype().native_datatype_element().CHAR() != null) { + columnEditor + .jdbcType(Types.CHAR) + .type("CHAR") + .length(1); + } + else if (ctx.datatype().native_datatype_element().NCHAR() != null) { + columnEditor + .jdbcType(Types.NCHAR) + .type("NCHAR") + .length(1); + } + else if (ctx.datatype().native_datatype_element().BINARY_FLOAT() != null) { + columnEditor + .jdbcType(OracleTypes.BINARY_FLOAT) + .type("BINARY_FLOAT"); + } + else if (ctx.datatype().native_datatype_element().BINARY_DOUBLE() != null) { + columnEditor + .jdbcType(OracleTypes.BINARY_DOUBLE) + .type("BINARY_DOUBLE"); + } + // PRECISION keyword is mandatory + else if (ctx.datatype().native_datatype_element().FLOAT() != null || + (ctx.datatype().native_datatype_element().DOUBLE() != null && ctx.datatype().native_datatype_element().PRECISION() != null)) { + columnEditor + .jdbcType(Types.FLOAT) + .type("FLOAT") + .length(126); + + // TODO float's precision is about bits not decimal digits; should be ok for now to over-size + if (precisionPart != null) { + setPrecision(precisionPart, columnEditor); + } + } + else if (ctx.datatype().native_datatype_element().REAL() != null) { + columnEditor + .jdbcType(Types.FLOAT) + .type("FLOAT") + // TODO float's precision is about bits not decimal digits; should be ok for now to over-size + .length(63); + } + else if (ctx.datatype().native_datatype_element().NUMBER() != null) { + columnEditor + .jdbcType(Types.NUMERIC) + .type("NUMBER"); + + if (precisionPart == null) { + columnEditor.length(38); + } + else { + setPrecision(precisionPart, columnEditor); + setScale(precisionPart, columnEditor); + } + } + else if (ctx.datatype().native_datatype_element().BLOB() != null) { + columnEditor + .jdbcType(Types.BLOB) + .type("BLOB"); + } + else if (ctx.datatype().native_datatype_element().CLOB() != null) { + columnEditor + .jdbcType(Types.CLOB) + .type("CLOB"); + } else { - throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().getText()); + throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().native_datatype_element().getText()); } + } + else if (ctx.datatype().INTERVAL() != null + && ctx.datatype().YEAR() != null + && ctx.datatype().TO() != null + && ctx.datatype().MONTH() != null) { + columnEditor + .jdbcType(OracleTypes.INTERVALYM) + .type("INTERVAL YEAR TO MONTH") + .length(2); + if (!ctx.datatype().expression().isEmpty()) { + columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); + } + } + else if (ctx.datatype().INTERVAL() != null + && ctx.datatype().DAY() != null + && ctx.datatype().TO() != null + && ctx.datatype().SECOND() != null) { + columnEditor + .jdbcType(OracleTypes.INTERVALDS) + .type("INTERVAL DAY TO SECOND") + .length(2) + .scale(6); + for (final PlSqlParser.ExpressionContext e : ctx.datatype().expression()) { + if (e.getSourceInterval().startsAfter(ctx.datatype().TO().getSourceInterval())) { + columnEditor.scale(Integer.valueOf(e.getText())); + } + else { + columnEditor.length(Integer.valueOf(e.getText())); + } + } + if (!ctx.datatype().expression().isEmpty()) { + columnEditor.length(Integer.valueOf((ctx.datatype().expression(0).getText()))); + } + } + else { + throw new IllegalArgumentException("Unsupported column type: " + ctx.datatype().getText()); + } - boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null); + boolean hasNotNullConstraint = ctx.inline_constraint().stream().anyMatch(c -> c.NOT() != null); // todo move to enterExpression and apply type conversion if (ctx.DEFAULT() != null) { @@ -255,10 +256,10 @@ else if (ctx.datatype().INTERVAL() != null } // todo move to nonNull columnEditor.optional(!hasNotNullConstraint); - } + } private int getVarCharDefaultLength() { - // TODO replace with value from select name, value from v$parameter where name='max_string_size'; + // TODO replace with value from select name, value from v$parameter where name='max_string_size'; return 4000; } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java index 15db38b91..167dfb415 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/CreateTableParserListener.java @@ -28,7 +28,7 @@ public class CreateTableParserListener extends BaseParserListener { private ColumnDefinitionParserListener columnDefinitionParserListener; CreateTableParserListener(final String catalogName, final String schemaName, final OracleDdlParser parser, - final List listeners) { + final List listeners) { this.catalogName = catalogName; this.schemaName = schemaName; this.parser = parser; @@ -53,7 +53,7 @@ public void exitCreate_table(PlSqlParser.Create_tableContext ctx) { listeners.remove(columnDefinitionParserListener); columnDefinitionParserListener = null; parser.databaseTables().overwriteTable(table); - //parser.signalCreateTable(tableEditor.tableId(), ctx); todo ? + // parser.signalCreateTable(tableEditor.tableId(), ctx); todo ? }, tableEditor, table); super.exitCreate_table(ctx); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/OracleDdlParserListener.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/OracleDdlParserListener.java index 20366fcfe..7af5a534d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/OracleDdlParserListener.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/antlr/listener/OracleDdlParserListener.java @@ -5,18 +5,19 @@ */ package io.debezium.connector.oracle.antlr.listener; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.tree.ParseTreeListener; + import io.debezium.antlr.AntlrDdlParserListener; import io.debezium.antlr.ProxyParseTreeListenerUtil; import io.debezium.connector.oracle.antlr.OracleDdlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener; import io.debezium.text.ParsingException; -import org.antlr.v4.runtime.ParserRuleContext; -import org.antlr.v4.runtime.tree.ParseTreeListener; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; /** * This class is Oracle main DDL parser listener class. diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java index d02258fa0..7d9ea8c5d 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/AbstractOracleDatatypesTest.java @@ -110,21 +110,24 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest new SchemaAndValueField("VAL_VARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "v\u010d2"), new SchemaAndValueField("VAL_NVARCHAR2", Schema.OPTIONAL_STRING_SCHEMA, "nv\u010d2"), new SchemaAndValueField("VAL_CHAR", Schema.OPTIONAL_STRING_SCHEMA, "c "), - new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d ") - ); + new SchemaAndValueField("VAL_NCHAR", Schema.OPTIONAL_STRING_SCHEMA, "n\u010d ")); private static final List EXPECTED_FP = Arrays.asList( new SchemaAndValueField("VAL_BF", Schema.OPTIONAL_FLOAT32_SCHEMA, 1.1f), new SchemaAndValueField("VAL_BD", Schema.OPTIONAL_FLOAT64_SCHEMA, 2.22), - new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("3.33"))), - new SchemaAndValueField("VAL_F_10", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("8.888"))), + new SchemaAndValueField("VAL_F", VariableScaleDecimal.builder().optional().build(), + VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("3.33"))), + new SchemaAndValueField("VAL_F_10", VariableScaleDecimal.builder().optional().build(), + VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("8.888"))), new SchemaAndValueField("VAL_NUM", Decimal.builder(6).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("4.444400")), - new SchemaAndValueField("VAL_DP", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("5.555"))), - new SchemaAndValueField("VAL_R", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("6.66"))), + new SchemaAndValueField("VAL_DP", VariableScaleDecimal.builder().optional().build(), + VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("5.555"))), + new SchemaAndValueField("VAL_R", VariableScaleDecimal.builder().optional().build(), + VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("6.66"))), new SchemaAndValueField("VAL_DECIMAL", Decimal.builder(6).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("1234.567891")), new SchemaAndValueField("VAL_NUMERIC", Decimal.builder(6).parameter(PRECISION_PARAMETER_KEY, "10").optional().build(), new BigDecimal("1234.567891")), - new SchemaAndValueField("VAL_NUM_VS", VariableScaleDecimal.builder().optional().build(), VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("77.323"))) - ); + new SchemaAndValueField("VAL_NUM_VS", VariableScaleDecimal.builder().optional().build(), + VariableScaleDecimal.fromLogical(VariableScaleDecimal.builder().optional().build(), new BigDecimal("77.323")))); private static final List EXPECTED_INT = Arrays.asList( new SchemaAndValueField("VAL_INT", NUMBER_SCHEMA, new BigDecimal("1")), @@ -141,18 +144,20 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest new SchemaAndValueField("VAL_NUMBER_9_NEGATIVE_SCALE", Schema.OPTIONAL_INT32_SCHEMA, 9999_99990), new SchemaAndValueField("VAL_NUMBER_18_NEGATIVE_SCALE", Schema.OPTIONAL_INT64_SCHEMA, 999_99999_99999_99900L), new SchemaAndValueField("VAL_DECIMAL", Schema.OPTIONAL_INT64_SCHEMA, 99999_99999L), - new SchemaAndValueField("VAL_NUMERIC", Schema.OPTIONAL_INT64_SCHEMA, 99999_99999L) - ); + new SchemaAndValueField("VAL_NUMERIC", Schema.OPTIONAL_INT64_SCHEMA, 99999_99999L)); private static final List EXPECTED_TIME = Arrays.asList( new SchemaAndValueField("VAL_DATE", Timestamp.builder().optional().build(), 1522108800_000l), - new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890), - new SchemaAndValueField("VAL_TS_PRECISION2", Timestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000 + 130), - new SchemaAndValueField("VAL_TS_PRECISION4", MicroTimestamp.builder().optional().build(), LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 125500), + new SchemaAndValueField("VAL_TS", MicroTimestamp.builder().optional().build(), + LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 7890), + new SchemaAndValueField("VAL_TS_PRECISION2", Timestamp.builder().optional().build(), + LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000 + 130), + new SchemaAndValueField("VAL_TS_PRECISION4", MicroTimestamp.builder().optional().build(), + LocalDateTime.of(2018, 3, 27, 12, 34, 56).toEpochSecond(ZoneOffset.UTC) * 1_000_000 + 125500), new SchemaAndValueField("VAL_TSTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00"), new SchemaAndValueField("VAL_INT_YTM", MicroDuration.builder().optional().build(), -110451600_000_000L), new SchemaAndValueField("VAL_INT_DTS", MicroDuration.builder().optional().build(), -93784_560_000L) -// new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00") + // new SchemaAndValueField("VAL_TSLTZ", ZonedTimestamp.builder().optional().build(), "2018-03-27T01:34:56.00789-11:00") ); private static final String[] ALL_TABLES = { @@ -174,14 +179,14 @@ public abstract class AbstractOracleDatatypesTest extends AbstractConnectorTest @BeforeClass public static void dropTables() throws SQLException { connection = TestHelper.testConnection(); - for (String table: ALL_TABLES) { + for (String table : ALL_TABLES) { TestHelper.dropTable(connection, table); } } protected static void createTables() throws SQLException { connection.execute(ALL_DDLS); - for (String table: ALL_TABLES) { + for (String table : ALL_TABLES) { streamTable(table); } } @@ -193,7 +198,7 @@ protected List getAllTables() { protected abstract boolean insertRecordsDuringTest(); private static void streamTable(String table) throws SQLException { - connection.execute("GRANT SELECT ON " + table + " to " + TestHelper.CONNECTOR_USER); + connection.execute("GRANT SELECT ON " + table + " to " + TestHelper.CONNECTOR_USER); connection.execute("ALTER TABLE " + table + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); } @@ -339,7 +344,8 @@ protected static void insertFpTypes() throws SQLException { } protected static void insertIntTypes() throws SQLException { - connection.execute("INSERT INTO debezium.type_int VALUES (1, 1, 22, 333, 4444, 5555, 99, 9999, 999999999, 999999999999999999, 94, 9949, 999999994, 999999999999999949, 9999999999, 9999999999)"); + connection.execute( + "INSERT INTO debezium.type_int VALUES (1, 1, 22, 333, 4444, 5555, 99, 9999, 999999999, 999999999999999999, 94, 9949, 999999994, 999999999999999949, 9999999999, 9999999999)"); connection.execute("COMMIT"); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java index f41476aab..694bf5dab 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorFilterIT.java @@ -74,8 +74,7 @@ public void before() throws SQLException { "GRANT ALL PRIVILEGES ON debezium2.table2 to debezium", "GRANT SELECT ON debezium2.table2 to " + TestHelper.CONNECTOR_USER, "GRANT SELECT ON debezium2.nopk to " + TestHelper.CONNECTOR_USER, - "ALTER TABLE debezium2.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS" - ); + "ALTER TABLE debezium2.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"); String ddl = "create table debezium.table1 (" + " id numeric(9,0) not null, " + " name varchar2(1000), " + diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 51a3b2df6..16cf50a3c 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -525,7 +525,6 @@ public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Excepti connection.execute("INSERT INTO debezium.customer2 VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))"); connection.execute("COMMIT"); - SourceRecords records = consumeRecordsByTopic(1); List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER2"); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java index 9f35ff395..856bfc91f 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleDdlParserTest.java @@ -23,7 +23,6 @@ import io.debezium.relational.Tables; import io.debezium.util.IoUtil; - /** * This is the test suite for Oracle Antlr parser unit testing */ @@ -66,20 +65,21 @@ public void shouldParseCreateAndAlterTable() throws Exception { // float(126) testColumn(table, "COL6", true, Types.FLOAT, "FLOAT", 126, 0, true, null); // date - testColumn(table, "COL7", true, Types.TIMESTAMP, "DATE", -1, null, true, null); + testColumn(table, "COL7", true, Types.TIMESTAMP, "DATE", -1, null, true, null); // timestamp testColumn(table, "COL8", true, Types.TIMESTAMP, "TIMESTAMP", 6, null, true, null); // blob - testColumn(table, "COL9", true, Types.BLOB, "BLOB", -1, null, true, null); + testColumn(table, "COL9", true, Types.BLOB, "BLOB", -1, null, true, null); // clob - testColumn(table, "COL10", true, Types.CLOB, "CLOB", -1, null, true, null); + testColumn(table, "COL10", true, Types.CLOB, "CLOB", -1, null, true, null); // todo sdo_geometry - //testColumn(table, "col12", true, Types.STRUCT, "MDSYS.SDO_GEOMETRY", -1, null,true); + // testColumn(table, "col12", true, Types.STRUCT, "MDSYS.SDO_GEOMETRY", -1, null,true); String ddl = "alter table " + TABLE_NAME + " add (col21 varchar2(20), col22 number(19));"; parser.parse(ddl, tables); Table alteredTable = tables.forTable(new TableId(null, null, TABLE_NAME)); - assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21", "COL22"); + assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21", + "COL22"); // varchar2(255) testColumn(alteredTable, "COL21", true, Types.VARCHAR, "VARCHAR2", 20, null, true, null); testColumn(alteredTable, "COL22", true, Types.NUMERIC, "NUMBER", 19, 0, true, null); @@ -95,7 +95,8 @@ public void shouldParseCreateAndAlterTable() throws Exception { ddl = "alter table " + TABLE_NAME + " add (col23 varchar2(20) not null);"; parser.parse(ddl, tables); alteredTable = tables.forTable(new TableId(null, null, TABLE_NAME)); - assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21", "COL22", "COL23"); + assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21", + "COL22", "COL23"); testColumn(alteredTable, "COL23", false, Types.VARCHAR, "VARCHAR2", 20, null, false, null); ddl = "alter table " + TABLE_NAME + " drop (col22, col23);"; @@ -103,14 +104,14 @@ public void shouldParseCreateAndAlterTable() throws Exception { alteredTable = tables.forTable(new TableId(null, null, TABLE_NAME)); assertThat(alteredTable.retrieveColumnNames()).containsExactly("ID", "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9", "COL10", "COL21"); - ddl = "drop table " + TABLE_NAME +";"; + ddl = "drop table " + TABLE_NAME + ";"; parser.parse(ddl, tables); assertThat(tables.size()).isZero(); // todo ddl = "drop table " + TABLE_NAME + " cascade constrains purge;"; - ddl = "ALTER TABLE "+TABLE_NAME+" ADD CONSTRAINT FKB97209E040C4205 FOREIGN KEY (col1) REFERENCES debezium_ref(ID);"; - ddl = "ALTER TABLE "+TABLE_NAME+" MODIFY COL1 varchar2(50) not null;"; + ddl = "ALTER TABLE " + TABLE_NAME + " ADD CONSTRAINT FKB97209E040C4205 FOREIGN KEY (col1) REFERENCES debezium_ref(ID);"; + ddl = "ALTER TABLE " + TABLE_NAME + " MODIFY COL1 varchar2(50) not null;"; } /** diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java index d813eb8af..2ddff85df 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SnapshotDatatypesIT.java @@ -24,7 +24,8 @@ */ public class SnapshotDatatypesIT extends AbstractOracleDatatypesTest { - @Rule public TestName name = new TestName(); + @Rule + public TestName name = new TestName(); @BeforeClass public static void beforeClass() throws SQLException { @@ -54,7 +55,7 @@ public void before() throws Exception { } private String getTableWhitelist() { - switch(name.getMethodName()) { + switch (name.getMethodName()) { case "stringTypes": return "ORCLPDB1.debezium.type_string"; case "fpTypes": diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java index cfe38de4c..60e817367 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java @@ -5,6 +5,10 @@ */ package io.debezium.connector.oracle; +import static org.fest.assertions.Assertions.assertThat; + +import java.time.Instant; + import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.junit.Before; @@ -15,10 +19,6 @@ import io.debezium.data.VerifyRecord; import io.debezium.relational.TableId; -import java.time.Instant; - -import static org.fest.assertions.Assertions.assertThat; - public class SourceInfoTest { private SourceInfo source; @@ -27,10 +27,9 @@ public class SourceInfoTest { public void beforeEach() { final OracleConnectorConfig connectorConfig = new OracleConnectorConfig( Configuration.create() - .with(OracleConnectorConfig.SERVER_NAME, "serverX") - .with(OracleConnectorConfig.DATABASE_NAME, "mydb") - .build() - ); + .with(OracleConnectorConfig.SERVER_NAME, "serverX") + .with(OracleConnectorConfig.DATABASE_NAME, "mydb") + .build()); source = new SourceInfo(connectorConfig); source.setSourceTime(Instant.now()); source.setTableId(new TableId("c", "s", "t")); diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java index a1fd65b1c..b5f372055 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/StreamingDatatypesIT.java @@ -30,9 +30,9 @@ public void before() throws Exception { Testing.Files.delete(TestHelper.DB_HISTORY_PATH); String whitelistedTables = getAllTables().stream() - .map(t -> "ORCLPDB1." + t) - .map(t -> t.replaceAll("\\.", "\\\\.")) - .collect(Collectors.joining(",")); + .map(t -> "ORCLPDB1." + t) + .map(t -> t.replaceAll("\\.", "\\\\.")) + .collect(Collectors.joining(",")); Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.TABLE_WHITELIST, whitelistedTables) diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index f7dd71bc7..dd9637c0c 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -47,8 +47,7 @@ public static Configuration.Builder defaultConfig() { Configuration.Builder builder = Configuration.create(); jdbcConfiguration.forEach( - (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value) - ); + (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)); return builder.with(RelationalDatabaseConnectorConfig.SERVER_NAME, "server1") .with(OracleConnectorConfig.PDB_NAME, "ORCLPDB1") @@ -103,8 +102,7 @@ private static Configuration.Builder testConfig() { Configuration.Builder builder = Configuration.create(); jdbcConfiguration.forEach( - (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value) - ); + (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)); return builder; } @@ -114,8 +112,7 @@ private static Configuration.Builder adminConfig() { Configuration.Builder builder = Configuration.create(); jdbcConfiguration.forEach( - (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value) - ); + (field, value) -> builder.with(OracleConnectorConfig.DATABASE_CONFIG_PREFIX + field, value)); return builder; }