From 9a8aaa4b0195b965fb60bfe2f179bdd1f99b1598 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Tue, 10 Jul 2018 15:49:29 +0200 Subject: [PATCH] DBZ-720 Initial data snapshotting or Oracle --- .../connector/oracle/LcrEventHandler.java | 2 +- .../OracleChangeEventSourceFactory.java | 3 +- .../connector/oracle/OracleConnection.java | 35 +++-- .../oracle/OracleConnectorConfig.java | 110 ++++++++++++++- .../connector/oracle/OracleConnectorTask.java | 22 +-- .../connector/oracle/OracleOffsetContext.java | 121 +++++++++++++++- .../OracleSnapshotChangeEventSource.java | 130 ++++++++++++++++-- .../oracle/SnapshotChangeRecordEmitter.java | 43 ++++++ .../debezium/connector/oracle/SourceInfo.java | 11 +- ...r.java => XStreamChangeRecordEmitter.java} | 4 +- .../connector/oracle/OracleConnectorIT.java | 120 +++++++++++++++- 11 files changed, 545 insertions(+), 56 deletions(-) create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java rename debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/{OracleChangeRecordEmitter.java => XStreamChangeRecordEmitter.java} (90%) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java index f2db9b4b1..9ab7c9146 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java @@ -99,7 +99,7 @@ private void dispatchDataChangeEvent(RowLCR lcr) throws InterruptedException { dispatcher.dispatchDataChangeEvent( tableId, - new OracleChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock) + new XStreamChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock) ); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java index 4a5327930..3c7d4c8d1 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java @@ -35,7 +35,8 @@ public OracleChangeEventSourceFactory(OracleConnectorConfig configuration, Oracl @Override public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offsetContext) { - return new OracleSnapshotChangeEventSource(configuration, (OracleOffsetContext) offsetContext, jdbcConnection, schema); + return new OracleSnapshotChangeEventSource(configuration, (OracleOffsetContext) offsetContext, jdbcConnection, + schema, dispatcher, clock); } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index f70a9f854..0e3b97c7e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -93,27 +93,34 @@ public Set readTableNames(String databaseCatalog, String schemaNamePatt public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern, TableFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) throws SQLException { - super.readSchema(tables, null, schemaNamePattern, tableFilter, columnFilter, removeTablesNotFoundInJdbc); + super.readSchema(tables, null, schemaNamePattern, null, columnFilter, removeTablesNotFoundInJdbc); Set tableIds = new HashSet<>(tables.tableIds()); for (TableId tableId : tableIds) { - TableEditor editor = tables.editTable(tableId); - editor.tableId(new TableId(databaseCatalog, tableId.schema(), tableId.table())); + // super.readSchema() populates ids without the catalog; hence we apply the filtering only + // here and if a table is included, overwrite it with a new id including the catalog + TableId tableIdWithCatalog = new TableId(databaseCatalog, tableId.schema(), tableId.table()); - List columnNames = new ArrayList<>(editor.columnNames()); - for (String columnName : columnNames) { - Column column = editor.columnWithName(columnName); - if (column.jdbcType() == Types.TIMESTAMP) { - editor.addColumn( - column.edit() - .length(column.scale().orElse(Column.UNSET_INT_VALUE)) - .scale(null) - .create() - ); + if (tableFilter.isIncluded(tableIdWithCatalog)) { + TableEditor editor = tables.editTable(tableId); + editor.tableId(tableIdWithCatalog); + + List columnNames = new ArrayList<>(editor.columnNames()); + for (String columnName : columnNames) { + Column column = editor.columnWithName(columnName); + if (column.jdbcType() == Types.TIMESTAMP) { + editor.addColumn( + column.edit() + .length(column.scale().orElse(Column.UNSET_INT_VALUE)) + .scale(null) + .create() + ); + } } + tables.overwriteTable(editor.create()); } - tables.overwriteTable(editor.create()); + tables.removeTable(tableId); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 8e2c3a922..5f76eb606 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -13,6 +13,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; import io.debezium.document.Document; import io.debezium.jdbc.JdbcConfiguration; @@ -81,6 +82,16 @@ public class OracleConnectorConfig extends RelationalDatabaseConnectorConfig { .withValidation(Field::isRequired) .withDescription("Name of the XStream Out server to connect to."); + public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode") + .withDisplayName("Snapshot mode") + .withEnum(SnapshotMode.class, SnapshotMode.INITIAL) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("The criteria for running a snapshot upon startup of the connector. " + + "Options include: " + + "'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; " + + "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. "); + /** * The set of {@link Field}s defined as part of this configuration. */ @@ -89,6 +100,7 @@ public class OracleConnectorConfig extends RelationalDatabaseConnectorConfig { DATABASE_NAME, PDB_NAME, XSTREAM_SERVER_NAME, + SNAPSHOT_MODE, RelationalDatabaseConnectorConfig.TABLE_WHITELIST, RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN, @@ -100,6 +112,7 @@ public class OracleConnectorConfig extends RelationalDatabaseConnectorConfig { private final String databaseName; private final String pdbName; private final String xoutServerName; + private final SnapshotMode snapshotMode; public OracleConnectorConfig(Configuration config) { super(config, config.getString(LOGICAL_NAME), new SystemTablesPredicate()); @@ -107,12 +120,13 @@ public OracleConnectorConfig(Configuration config) { this.databaseName = config.getString(DATABASE_NAME); this.pdbName = config.getString(PDB_NAME); this.xoutServerName = config.getString(XSTREAM_SERVER_NAME); + this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); } public static ConfigDef configDef() { ConfigDef config = new ConfigDef(); - Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME, PDB_NAME, XSTREAM_SERVER_NAME); + Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME, PDB_NAME, XSTREAM_SERVER_NAME, SNAPSHOT_MODE); Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST, RelationalDatabaseConnectorConfig.TABLE_BLACKLIST, RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN @@ -134,6 +148,10 @@ public String getXoutServerName() { return xoutServerName; } + public SnapshotMode getSnapshotMode() { + return snapshotMode; + } + /** * Returns a configured (but not yet started) instance of the database history. */ @@ -163,6 +181,79 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { return databaseHistory; } + /** + * The set of predefined SnapshotMode options or aliases. + */ + public static enum SnapshotMode implements EnumeratedValue { + + /** + * Perform a snapshot of data and schema upon initial startup of a connector. + */ + INITIAL("initial", true), + + /** + * Perform a snapshot of data and schema upon initial startup of a connector. + */ + INITIAL_SCHEMA_ONLY("initial_schema_only", false); + + private final String value; + private final boolean includeData; + + private SnapshotMode(String value, boolean includeData) { + this.value = value; + this.includeData = includeData; + } + + @Override + public String getValue() { + return value; + } + + /** + * Whether this snapshotting mode should include the actual data or just the + * schema of captured tables. + */ + public boolean includeData() { + return includeData; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static SnapshotMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + + for (SnapshotMode option : SnapshotMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) return option; + } + + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static SnapshotMode parse(String value, String defaultValue) { + SnapshotMode mode = parse(value); + + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + + return mode; + } + } + /** * A {@link TableFilter} that excludes all Oracle system tables. * @@ -172,11 +263,22 @@ private static class SystemTablesPredicate implements TableFilter { @Override public boolean isIncluded(TableId t) { - return !t.schema().toLowerCase().equals("system") && - !t.schema().toLowerCase().equals("sys") && - !t.schema().toLowerCase().equals("mdsys") && + return !t.schema().toLowerCase().equals("appqossys") && !t.schema().toLowerCase().equals("ctxsys") && + !t.schema().toLowerCase().equals("dvsys") && + !t.schema().toLowerCase().equals("dbsfwuser") && + !t.schema().toLowerCase().equals("dbsnmp") && + !t.schema().toLowerCase().equals("gsmadmin_internal") && + !t.schema().toLowerCase().equals("lbacsys") && + !t.schema().toLowerCase().equals("mdsys") && + !t.schema().toLowerCase().equals("ojvmsys") && + !t.schema().toLowerCase().equals("olapsys") && + !t.schema().toLowerCase().equals("orddata") && + !t.schema().toLowerCase().equals("ordsys") && !t.schema().toLowerCase().equals("outln") && + !t.schema().toLowerCase().equals("sys") && + !t.schema().toLowerCase().equals("system") && + !t.schema().toLowerCase().equals("wmsys") && !t.schema().toLowerCase().equals("xdb"); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index 94b7ac482..6eb426e7d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -6,7 +6,6 @@ package io.debezium.connector.oracle; import java.sql.SQLException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +24,7 @@ import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.TableId; import io.debezium.util.Clock; import io.debezium.util.SchemaNameAdjuster; @@ -83,7 +83,7 @@ public void start(Configuration config) { this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); - OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig); + OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig.getLogicalName())); if (previousOffset != null) { schema.recover(previousOffset); } @@ -102,24 +102,6 @@ public void start(Configuration config) { coordinator.start(); } - private OracleOffsetContext getPreviousOffset(OracleConnectorConfig connectorConfig) { - OracleOffsetContext offsetContext = new OracleOffsetContext(connectorConfig.getLogicalName()); - - Map previousOffset = context.offsetStorageReader() - .offsets(Collections.singleton(offsetContext.getPartition())) - .get(offsetContext.getPartition()); - - if (previousOffset != null) { - long scn = (long) previousOffset.get(SourceInfo.SCN_KEY); - offsetContext.setScn(scn); - LOGGER.info("Found previous offset {}", offsetContext); - - return offsetContext; - } - - return null; - } - @Override public List poll() throws InterruptedException { // TODO diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 34f4b2cfe..0c93825c2 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -7,6 +7,7 @@ import java.time.Instant; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.kafka.connect.data.Schema; @@ -17,16 +18,63 @@ public class OracleOffsetContext implements OffsetContext { private static final String SERVER_PARTITION_KEY = "server"; + private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; private final Schema sourceInfoSchema; private final Map partition; private final SourceInfo sourceInfo; - public OracleOffsetContext(String serverName) { + /** + * Whether a snapshot has been completed or not. + */ + private boolean snapshotCompleted; + + private OracleOffsetContext(String serverName, long scn, boolean snapshot, boolean snapshotCompleted) { partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName); + sourceInfo = new SourceInfo(serverName); + sourceInfo.setScn(scn); + sourceInfo.setSnapshot(snapshot); sourceInfoSchema = sourceInfo.schema(); + + this.snapshotCompleted = snapshotCompleted; + } + + public static class Builder { + + private String logicalName; + private long scn; + private boolean snapshot; + private boolean snapshotCompleted; + + public Builder logicalName(String logicalName) { + this.logicalName = logicalName; + return this; + } + + public Builder scn(long scn) { + this.scn = scn; + return this; + } + + public Builder snapshot(boolean snapshot) { + this.snapshot = snapshot; + return this; + } + + public Builder snapshotCompleted(boolean snapshotCompleted) { + this.snapshotCompleted = snapshotCompleted; + return this; + } + + OracleOffsetContext build() { + return new OracleOffsetContext(logicalName, scn, snapshot, snapshotCompleted); + } + } + + public static Builder create() { + return new Builder(); } @Override @@ -36,7 +84,18 @@ public OracleOffsetContext(String serverName) { @Override public Map getOffset() { - return Collections.singletonMap(SourceInfo.SCN_KEY, sourceInfo.getScn()); + if (sourceInfo.isSnapshot()) { + Map offset = new HashMap<>(); + + offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn()); + offset.put(SourceInfo.SNAPSHOT_KEY, true); + offset.put(SNAPSHOT_COMPLETED_KEY, snapshotCompleted); + + return offset; + } + else { + return Collections.singletonMap(SourceInfo.SCN_KEY, sourceInfo.getScn()); + } } @Override @@ -64,4 +123,62 @@ public void setTransactionId(String transactionId) { public void setSourceTime(Instant instant) { sourceInfo.setSourceTime(instant); } + + @Override + public boolean isSnapshotRunning() { + return sourceInfo.isSnapshot() && !snapshotCompleted; + } + + @Override + public void preSnapshotStart() { + sourceInfo.setSnapshot(true); + snapshotCompleted = false; + } + + @Override + public void preSnapshotCompletion() { + snapshotCompleted = true; + } + + @Override + public void postSnapshotCompletion() { + sourceInfo.setSnapshot(false); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn()); + + if (sourceInfo.isSnapshot()) { + sb.append(", snapshot=").append(sourceInfo.isSnapshot()); + sb.append(", snapshot_completed=").append(snapshotCompleted); + } + + sb.append("]"); + + return sb.toString(); + } + + public static class Loader implements OffsetContext.Loader { + + private final String logicalName; + + public Loader(String logicalName) { + this.logicalName = logicalName; + } + + @Override + public Map getPartition() { + return Collections.singletonMap(SERVER_PARTITION_KEY, logicalName); + } + + @Override + public OffsetContext load(Map offset) { + long scn = (long) offset.get(SourceInfo.SCN_KEY); + boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); + boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); + + return new OracleOffsetContext(logicalName, scn, snapshot, snapshotCompleted); + } + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index 183aceab1..abe8f62fa 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -8,31 +8,63 @@ import java.sql.Clob; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Savepoint; import java.sql.Statement; +import java.time.Instant; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.pipeline.spi.ChangeRecordEmitter; +import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.HistorizedRelationalSnapshotChangeEventSource; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; +import io.debezium.util.Clock; +/** + * A {@link StreamingChangeEventSource} for Oracle. + * + * @author Gunnar Morling + */ public class OracleSnapshotChangeEventSource extends HistorizedRelationalSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotChangeEventSource.class); private final OracleConnectorConfig connectorConfig; private final OracleConnection jdbcConnection; + private final Clock clock; - public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, OracleDatabaseSchema schema) { - super(connectorConfig, previousOffset, jdbcConnection, schema); + public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, OracleDatabaseSchema schema, EventDispatcher dispatcher, Clock clock) { + super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock); this.connectorConfig = connectorConfig; this.jdbcConnection = jdbcConnection; + this.clock = clock; + } + + @Override + protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) { + boolean snapshotSchema = true; + boolean snapshotData = true; + + // found a previous offset and the earlier snapshot has completed + if (previousOffset != null && !previousOffset.isSnapshotRunning()) { + snapshotData = false; + snapshotData = false; + } + else { + snapshotData = connectorConfig.getSnapshotMode().includeData(); + } + + return new SnapshottingTask(snapshotSchema, snapshotData); } @Override @@ -52,7 +84,9 @@ protected Set getAllTableIds(SnapshotContext ctx) throws Exception { } @Override - protected boolean lockTables(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException { + protected boolean lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException { + ((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot"); + try (Statement statement = jdbcConnection.connection().createStatement()) { for (TableId tableId : snapshotContext.capturedTables) { if (!sourceContext.isRunning()) { @@ -68,21 +102,86 @@ protected boolean lockTables(ChangeEventSourceContext sourceContext, SnapshotCon return true; } + @Override + protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException { + jdbcConnection.connection().rollback(((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint); + } + @Override protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception { + Optional latestTableDdlScn = getLatestTableDdlScn(ctx); + long currentScn; + + // we must use an SCN for taking the snapshot that represents a later timestamp than the latest DDL change than + // any of the captured tables; this will not be a problem in practice, but during testing it may happen that the + // SCN of "now" represents the same timestamp as a newly created table that should be captured; in that case + // we'd get a ORA-01466 when running the flashback query for doing the snapshot + do { + currentScn = getCurrentScn(ctx); + } + while(areSameTimestamp(latestTableDdlScn.orElse(null), currentScn)); + + ctx.offset = OracleOffsetContext.create() + .logicalName(connectorConfig.getLogicalName()) + .scn(currentScn) + .snapshot(true) + .snapshotCompleted(false) + .build(); + } + + private long getCurrentScn(SnapshotContext ctx) throws SQLException { try(Statement statement = jdbcConnection.connection().createStatement(); - ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE") ) { + ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE")) { if (!rs.next()) { throw new IllegalStateException("Couldn't get SCN"); } - Long scn = rs.getLong(1); + return rs.getLong(1); + } + } - OracleOffsetContext offset = new OracleOffsetContext(connectorConfig.getLogicalName()); - offset.setScn(scn); + /** + * Whether the two SCNs represent the same timestamp or not (resolution is only 3 seconds). + */ + private boolean areSameTimestamp(Long scn1, long scn2) throws SQLException { + if (scn1 == null) { + return false; + } - ctx.offset = offset; + try(Statement statement = jdbcConnection.connection().createStatement(); + ResultSet rs = statement.executeQuery("SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ") = SCN_TO_TIMESTAMP(" + scn2 + ")" )) { + + return rs.next(); + } + } + + /** + * Returns the SCN of the latest DDL change to the captured tables. The result will be empty if there's no table to + * capture as per the configuration. + */ + private Optional getLatestTableDdlScn(SnapshotContext ctx) throws SQLException { + if (ctx.capturedTables.isEmpty()) { + return Optional.empty(); + } + + StringBuilder lastDdlScnQuery = new StringBuilder("SELECT MAX(TIMESTAMP_TO_SCN(last_ddl_time))") + .append(" FROM all_objects") + .append(" WHERE"); + + for(TableId table : ctx.capturedTables) { + lastDdlScnQuery.append(" (owner = '" + table.schema() + "' AND object_name = '" + table.table() + "') OR"); + } + + String query = lastDdlScnQuery.substring(0, lastDdlScnQuery.length() - 3).toString(); + try(Statement statement = jdbcConnection.connection().createStatement(); + ResultSet rs = statement.executeQuery(query)) { + + if (!rs.next()) { + throw new IllegalStateException("Couldn't get latest table DDL SCN"); + } + + return Optional.of(rs.getLong(1)); } } @@ -130,6 +229,19 @@ protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext, } } + @Override + protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) { + long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn"); + return "SELECT * FROM " + tableId.schema() + "." + tableId.table() + " AS OF SCN " + snapshotOffset; + } + + @Override + protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] row) { + // TODO can this be done in a better way than doing it as a side-effect here? + ((OracleOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(clock.currentTimeInMillis())); + return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, clock); + } + @Override protected void complete() { if (connectorConfig.getPdbName() != null) { @@ -142,6 +254,8 @@ protected void complete() { */ private static class OracleSnapshotContext extends SnapshotContext { + private Savepoint preSchemaSnapshotSavepoint; + public OracleSnapshotContext(String catalogName) throws SQLException { super(catalogName); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java new file mode 100644 index 000000000..b5c551f6e --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SnapshotChangeRecordEmitter.java @@ -0,0 +1,43 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import io.debezium.data.Envelope.Operation; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.RelationalChangeRecordEmitter; +import io.debezium.util.Clock; +import oracle.streams.RowLCR; + +/** + * Emits change data based on a single {@link RowLCR} event. + * + * @author Gunnar Morling + */ +public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter { + + private final Object[] row; + + public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) { + super(offset, clock); + + this.row = row; + } + + @Override + protected Operation getOperation() { + return Operation.READ; + } + + @Override + protected Object[] getOldColumnValues() { + throw new UnsupportedOperationException("Can't get old row values for READ record"); + } + + @Override + protected Object[] getNewColumnValues() { + return row; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java index 30caa8914..074bd8a7f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java @@ -33,6 +33,7 @@ public class SourceInfo extends AbstractSourceInfo { private long scn; private String transactionId; private Instant sourceTime; + private boolean snapshot; protected SourceInfo(String serverName) { super(Module.version()); @@ -51,7 +52,7 @@ public Struct struct() { .put(TIMESTAMP_KEY, sourceTime.toEpochMilli()) .put(TXID_KEY, transactionId) .put(SCN_KEY, scn) - .put(SNAPSHOT_KEY, false); + .put(SNAPSHOT_KEY, snapshot); } public String getServerName() { @@ -81,4 +82,12 @@ public Instant getSourceTime() { public void setSourceTime(Instant sourceTime) { this.sourceTime = sourceTime; } + + public void setSnapshot(boolean snapshot) { + this.snapshot = snapshot; + } + + public boolean isSnapshot() { + return snapshot; + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeRecordEmitter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/XStreamChangeRecordEmitter.java similarity index 90% rename from debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeRecordEmitter.java rename to debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/XStreamChangeRecordEmitter.java index 5c7bdf07e..4f47e0753 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeRecordEmitter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/XStreamChangeRecordEmitter.java @@ -18,12 +18,12 @@ * * @author Gunnar Morling */ -public class OracleChangeRecordEmitter extends RelationalChangeRecordEmitter { +public class XStreamChangeRecordEmitter extends RelationalChangeRecordEmitter { private final RowLCR lcr; private final Table table; - public OracleChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table, Clock clock) { + public XStreamChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table, Clock clock) { super(offset, clock); this.lcr = lcr; diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index aa810ddff..aa835b6db 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -25,6 +25,7 @@ import io.debezium.connector.oracle.util.TestHelper; import io.debezium.data.VerifyRecord; import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.util.Testing; /** @@ -35,6 +36,8 @@ public class OracleConnectorIT extends AbstractConnectorTest { private static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1); + private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; + private static OracleConnection connection; @BeforeClass @@ -64,15 +67,124 @@ public static void closeConnection() throws SQLException { } @Before - public void before() { + public void before() throws SQLException { + connection.execute("delete from debezium.customer"); setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); } + @Test + public void shouldTakeSnapshot() throws Exception { + Configuration config = TestHelper.defaultConfig() + .with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER") + .build(); + + int expectedRecordCount = 0; + connection.execute("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))"); + connection.execute("INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"); + connection.execute("COMMIT"); + expectedRecordCount += 2; + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + SourceRecords records = consumeRecordsByTopic(expectedRecordCount); + List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + + // read + SourceRecord record1 = testTableRecords.get(0); + VerifyRecord.isValidRead(record1); + Struct after = (Struct) ((Struct)record1.value()).get("after"); + assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1)); + assertThat(after.get("NAME")).isEqualTo("Billie-Bob"); + assertThat(after.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56)); + assertThat(after.get("REGISTERED")).isEqualTo(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))); + + assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); + assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false); + + Struct source = (Struct) ((Struct)record1.value()).get("source"); + assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); + + SourceRecord record2 = testTableRecords.get(1); + VerifyRecord.isValidRead(record2); + after = (Struct) ((Struct)record2.value()).get("after"); + assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2)); + assertThat(after.get("NAME")).isEqualTo("Bruce"); + assertThat(after.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67)); + assertThat(after.get("REGISTERED")).isNull(); + + assertThat(record2.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); + assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true); + + source = (Struct) ((Struct)record2.value()).get("source"); + assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); + } + + @Test + public void shouldContinueWithStreamingAfterSnapshot() throws Exception { + Configuration config = TestHelper.defaultConfig() + .with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER") + .build(); + + int expectedRecordCount = 0; + connection.execute("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))"); + connection.execute("INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"); + connection.execute("COMMIT"); + expectedRecordCount += 2; + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + SourceRecords records = consumeRecordsByTopic(expectedRecordCount); + List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + + // read + SourceRecord record1 = testTableRecords.get(0); + VerifyRecord.isValidRead(record1); + Struct after = (Struct) ((Struct)record1.value()).get("after"); + assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1)); + + assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); + assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false); + + SourceRecord record2 = testTableRecords.get(1); + VerifyRecord.isValidRead(record2); + after = (Struct) ((Struct)record2.value()).get("after"); + assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2)); + + assertThat(record2.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true); + assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true); + + expectedRecordCount = 0; + connection.execute("INSERT INTO debezium.customer VALUES (3, 'Brian', 2345.67, null)"); + connection.execute("COMMIT"); + expectedRecordCount += 1; + + records = consumeRecordsByTopic(expectedRecordCount); + testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER"); + assertThat(testTableRecords).hasSize(expectedRecordCount); + + SourceRecord record3 = testTableRecords.get(0); + VerifyRecord.isValidInsert(record3); + after = (Struct) ((Struct)record3.value()).get("after"); + assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(3)); + + assertThat(record3.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse(); + assertThat(record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse(); + + Struct source = (Struct) ((Struct)record3.value()).get("source"); + assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(false); + } + @Test public void shouldReadChangeStreamForExistingTable() throws Exception { - Configuration config = TestHelper.defaultConfig().build(); + Configuration config = TestHelper.defaultConfig() + .with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER") + .build(); start(OracleConnector.class, config); assertConnectorIsRunning(); @@ -156,7 +268,9 @@ public void shouldReadChangeStreamForExistingTable() throws Exception { public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception { TestHelper.dropTable(connection, "debezium.customer2"); - Configuration config = TestHelper.defaultConfig().build(); + Configuration config = TestHelper.defaultConfig() + .with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER2") + .build(); start(OracleConnector.class, config); assertConnectorIsRunning();