DBZ-720 Initial data snapshotting or Oracle

This commit is contained in:
Gunnar Morling 2018-07-10 15:49:29 +02:00 committed by Jiri Pechanec
parent 235346cc9f
commit 9a8aaa4b01
11 changed files with 545 additions and 56 deletions

View File

@ -99,7 +99,7 @@ private void dispatchDataChangeEvent(RowLCR lcr) throws InterruptedException {
dispatcher.dispatchDataChangeEvent(
tableId,
new OracleChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock)
new XStreamChangeRecordEmitter(offsetContext, lcr, schema.tableFor(tableId), clock)
);
}

View File

@ -35,7 +35,8 @@ public OracleChangeEventSourceFactory(OracleConnectorConfig configuration, Oracl
@Override
public SnapshotChangeEventSource getSnapshotChangeEventSource(OffsetContext offsetContext) {
return new OracleSnapshotChangeEventSource(configuration, (OracleOffsetContext) offsetContext, jdbcConnection, schema);
return new OracleSnapshotChangeEventSource(configuration, (OracleOffsetContext) offsetContext, jdbcConnection,
schema, dispatcher, clock);
}
@Override

View File

@ -93,27 +93,34 @@ public Set<TableId> readTableNames(String databaseCatalog, String schemaNamePatt
public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern, TableFilter tableFilter,
ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) throws SQLException {
super.readSchema(tables, null, schemaNamePattern, tableFilter, columnFilter, removeTablesNotFoundInJdbc);
super.readSchema(tables, null, schemaNamePattern, null, columnFilter, removeTablesNotFoundInJdbc);
Set<TableId> tableIds = new HashSet<>(tables.tableIds());
for (TableId tableId : tableIds) {
TableEditor editor = tables.editTable(tableId);
editor.tableId(new TableId(databaseCatalog, tableId.schema(), tableId.table()));
// super.readSchema() populates ids without the catalog; hence we apply the filtering only
// here and if a table is included, overwrite it with a new id including the catalog
TableId tableIdWithCatalog = new TableId(databaseCatalog, tableId.schema(), tableId.table());
List<String> columnNames = new ArrayList<>(editor.columnNames());
for (String columnName : columnNames) {
Column column = editor.columnWithName(columnName);
if (column.jdbcType() == Types.TIMESTAMP) {
editor.addColumn(
column.edit()
.length(column.scale().orElse(Column.UNSET_INT_VALUE))
.scale(null)
.create()
);
if (tableFilter.isIncluded(tableIdWithCatalog)) {
TableEditor editor = tables.editTable(tableId);
editor.tableId(tableIdWithCatalog);
List<String> columnNames = new ArrayList<>(editor.columnNames());
for (String columnName : columnNames) {
Column column = editor.columnWithName(columnName);
if (column.jdbcType() == Types.TIMESTAMP) {
editor.addColumn(
column.edit()
.length(column.scale().orElse(Column.UNSET_INT_VALUE))
.scale(null)
.create()
);
}
}
tables.overwriteTable(editor.create());
}
tables.overwriteTable(editor.create());
tables.removeTable(tableId);
}
}

View File

@ -13,6 +13,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcConfiguration;
@ -81,6 +82,16 @@ public class OracleConnectorConfig extends RelationalDatabaseConnectorConfig {
.withValidation(Field::isRequired)
.withDescription("Name of the XStream Out server to connect to.");
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The criteria for running a snapshot upon startup of the connector. "
+ "Options include: "
+ "'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; "
+ "'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. ");
/**
* The set of {@link Field}s defined as part of this configuration.
*/
@ -89,6 +100,7 @@ public class OracleConnectorConfig extends RelationalDatabaseConnectorConfig {
DATABASE_NAME,
PDB_NAME,
XSTREAM_SERVER_NAME,
SNAPSHOT_MODE,
RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
@ -100,6 +112,7 @@ public class OracleConnectorConfig extends RelationalDatabaseConnectorConfig {
private final String databaseName;
private final String pdbName;
private final String xoutServerName;
private final SnapshotMode snapshotMode;
public OracleConnectorConfig(Configuration config) {
super(config, config.getString(LOGICAL_NAME), new SystemTablesPredicate());
@ -107,12 +120,13 @@ public OracleConnectorConfig(Configuration config) {
this.databaseName = config.getString(DATABASE_NAME);
this.pdbName = config.getString(PDB_NAME);
this.xoutServerName = config.getString(XSTREAM_SERVER_NAME);
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
}
public static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME, PDB_NAME, XSTREAM_SERVER_NAME);
Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME, PDB_NAME, XSTREAM_SERVER_NAME, SNAPSHOT_MODE);
Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN
@ -134,6 +148,10 @@ public String getXoutServerName() {
return xoutServerName;
}
public SnapshotMode getSnapshotMode() {
return snapshotMode;
}
/**
* Returns a configured (but not yet started) instance of the database history.
*/
@ -163,6 +181,79 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
return databaseHistory;
}
/**
* The set of predefined SnapshotMode options or aliases.
*/
public static enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
INITIAL("initial", true),
/**
* Perform a snapshot of data and schema upon initial startup of a connector.
*/
INITIAL_SCHEMA_ONLY("initial_schema_only", false);
private final String value;
private final boolean includeData;
private SnapshotMode(String value, boolean includeData) {
this.value = value;
this.includeData = includeData;
}
@Override
public String getValue() {
return value;
}
/**
* Whether this snapshotting mode should include the actual data or just the
* schema of captured tables.
*/
public boolean includeData() {
return includeData;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static SnapshotMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SnapshotMode option : SnapshotMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotMode parse(String value, String defaultValue) {
SnapshotMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
/**
* A {@link TableFilter} that excludes all Oracle system tables.
*
@ -172,11 +263,22 @@ private static class SystemTablesPredicate implements TableFilter {
@Override
public boolean isIncluded(TableId t) {
return !t.schema().toLowerCase().equals("system") &&
!t.schema().toLowerCase().equals("sys") &&
!t.schema().toLowerCase().equals("mdsys") &&
return !t.schema().toLowerCase().equals("appqossys") &&
!t.schema().toLowerCase().equals("ctxsys") &&
!t.schema().toLowerCase().equals("dvsys") &&
!t.schema().toLowerCase().equals("dbsfwuser") &&
!t.schema().toLowerCase().equals("dbsnmp") &&
!t.schema().toLowerCase().equals("gsmadmin_internal") &&
!t.schema().toLowerCase().equals("lbacsys") &&
!t.schema().toLowerCase().equals("mdsys") &&
!t.schema().toLowerCase().equals("ojvmsys") &&
!t.schema().toLowerCase().equals("olapsys") &&
!t.schema().toLowerCase().equals("orddata") &&
!t.schema().toLowerCase().equals("ordsys") &&
!t.schema().toLowerCase().equals("outln") &&
!t.schema().toLowerCase().equals("sys") &&
!t.schema().toLowerCase().equals("system") &&
!t.schema().toLowerCase().equals("wmsys") &&
!t.schema().toLowerCase().equals("xdb");
}
}

View File

@ -6,7 +6,6 @@
package io.debezium.connector.oracle;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -25,6 +24,7 @@
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
@ -83,7 +83,7 @@ public void start(Configuration config) {
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig);
OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig.getLogicalName()));
if (previousOffset != null) {
schema.recover(previousOffset);
}
@ -102,24 +102,6 @@ public void start(Configuration config) {
coordinator.start();
}
private OracleOffsetContext getPreviousOffset(OracleConnectorConfig connectorConfig) {
OracleOffsetContext offsetContext = new OracleOffsetContext(connectorConfig.getLogicalName());
Map<String, Object> previousOffset = context.offsetStorageReader()
.offsets(Collections.singleton(offsetContext.getPartition()))
.get(offsetContext.getPartition());
if (previousOffset != null) {
long scn = (long) previousOffset.get(SourceInfo.SCN_KEY);
offsetContext.setScn(scn);
LOGGER.info("Found previous offset {}", offsetContext);
return offsetContext;
}
return null;
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// TODO

View File

@ -7,6 +7,7 @@
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
@ -17,16 +18,63 @@
public class OracleOffsetContext implements OffsetContext {
private static final String SERVER_PARTITION_KEY = "server";
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
private final Schema sourceInfoSchema;
private final Map<String, String> partition;
private final SourceInfo sourceInfo;
public OracleOffsetContext(String serverName) {
/**
* Whether a snapshot has been completed or not.
*/
private boolean snapshotCompleted;
private OracleOffsetContext(String serverName, long scn, boolean snapshot, boolean snapshotCompleted) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName);
sourceInfo = new SourceInfo(serverName);
sourceInfo.setScn(scn);
sourceInfo.setSnapshot(snapshot);
sourceInfoSchema = sourceInfo.schema();
this.snapshotCompleted = snapshotCompleted;
}
public static class Builder {
private String logicalName;
private long scn;
private boolean snapshot;
private boolean snapshotCompleted;
public Builder logicalName(String logicalName) {
this.logicalName = logicalName;
return this;
}
public Builder scn(long scn) {
this.scn = scn;
return this;
}
public Builder snapshot(boolean snapshot) {
this.snapshot = snapshot;
return this;
}
public Builder snapshotCompleted(boolean snapshotCompleted) {
this.snapshotCompleted = snapshotCompleted;
return this;
}
OracleOffsetContext build() {
return new OracleOffsetContext(logicalName, scn, snapshot, snapshotCompleted);
}
}
public static Builder create() {
return new Builder();
}
@Override
@ -36,7 +84,18 @@ public OracleOffsetContext(String serverName) {
@Override
public Map<String, ?> getOffset() {
return Collections.singletonMap(SourceInfo.SCN_KEY, sourceInfo.getScn());
if (sourceInfo.isSnapshot()) {
Map<String, Object> offset = new HashMap<>();
offset.put(SourceInfo.SCN_KEY, sourceInfo.getScn());
offset.put(SourceInfo.SNAPSHOT_KEY, true);
offset.put(SNAPSHOT_COMPLETED_KEY, snapshotCompleted);
return offset;
}
else {
return Collections.singletonMap(SourceInfo.SCN_KEY, sourceInfo.getScn());
}
}
@Override
@ -64,4 +123,62 @@ public void setTransactionId(String transactionId) {
public void setSourceTime(Instant instant) {
sourceInfo.setSourceTime(instant);
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && !snapshotCompleted;
}
@Override
public void preSnapshotStart() {
sourceInfo.setSnapshot(true);
snapshotCompleted = false;
}
@Override
public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(false);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn());
if (sourceInfo.isSnapshot()) {
sb.append(", snapshot=").append(sourceInfo.isSnapshot());
sb.append(", snapshot_completed=").append(snapshotCompleted);
}
sb.append("]");
return sb.toString();
}
public static class Loader implements OffsetContext.Loader {
private final String logicalName;
public Loader(String logicalName) {
this.logicalName = logicalName;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(SERVER_PARTITION_KEY, logicalName);
}
@Override
public OffsetContext load(Map<String, ?> offset) {
long scn = (long) offset.get(SourceInfo.SCN_KEY);
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY));
return new OracleOffsetContext(logicalName, scn, snapshot, snapshotCompleted);
}
}
}

View File

@ -8,31 +8,63 @@
import java.sql.Clob;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.HistorizedRelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
/**
* A {@link StreamingChangeEventSource} for Oracle.
*
* @author Gunnar Morling
*/
public class OracleSnapshotChangeEventSource extends HistorizedRelationalSnapshotChangeEventSource {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotChangeEventSource.class);
private final OracleConnectorConfig connectorConfig;
private final OracleConnection jdbcConnection;
private final Clock clock;
public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, OracleDatabaseSchema schema) {
super(connectorConfig, previousOffset, jdbcConnection, schema);
public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, OracleDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock) {
super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock);
this.connectorConfig = connectorConfig;
this.jdbcConnection = jdbcConnection;
this.clock = clock;
}
@Override
protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
boolean snapshotSchema = true;
boolean snapshotData = true;
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
snapshotData = false;
snapshotData = false;
}
else {
snapshotData = connectorConfig.getSnapshotMode().includeData();
}
return new SnapshottingTask(snapshotSchema, snapshotData);
}
@Override
@ -52,7 +84,9 @@ protected Set<TableId> getAllTableIds(SnapshotContext ctx) throws Exception {
}
@Override
protected boolean lockTables(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException {
protected boolean lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException {
((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
try (Statement statement = jdbcConnection.connection().createStatement()) {
for (TableId tableId : snapshotContext.capturedTables) {
if (!sourceContext.isRunning()) {
@ -68,21 +102,86 @@ protected boolean lockTables(ChangeEventSourceContext sourceContext, SnapshotCon
return true;
}
@Override
protected void releaseSchemaSnapshotLocks(SnapshotContext snapshotContext) throws SQLException {
jdbcConnection.connection().rollback(((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint);
}
@Override
protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception {
Optional<Long> latestTableDdlScn = getLatestTableDdlScn(ctx);
long currentScn;
// we must use an SCN for taking the snapshot that represents a later timestamp than the latest DDL change than
// any of the captured tables; this will not be a problem in practice, but during testing it may happen that the
// SCN of "now" represents the same timestamp as a newly created table that should be captured; in that case
// we'd get a ORA-01466 when running the flashback query for doing the snapshot
do {
currentScn = getCurrentScn(ctx);
}
while(areSameTimestamp(latestTableDdlScn.orElse(null), currentScn));
ctx.offset = OracleOffsetContext.create()
.logicalName(connectorConfig.getLogicalName())
.scn(currentScn)
.snapshot(true)
.snapshotCompleted(false)
.build();
}
private long getCurrentScn(SnapshotContext ctx) throws SQLException {
try(Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE") ) {
ResultSet rs = statement.executeQuery("select CURRENT_SCN from V$DATABASE")) {
if (!rs.next()) {
throw new IllegalStateException("Couldn't get SCN");
}
Long scn = rs.getLong(1);
return rs.getLong(1);
}
}
OracleOffsetContext offset = new OracleOffsetContext(connectorConfig.getLogicalName());
offset.setScn(scn);
/**
* Whether the two SCNs represent the same timestamp or not (resolution is only 3 seconds).
*/
private boolean areSameTimestamp(Long scn1, long scn2) throws SQLException {
if (scn1 == null) {
return false;
}
ctx.offset = offset;
try(Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery("SELECT 1 FROM DUAL WHERE SCN_TO_TIMESTAMP(" + scn1 + ") = SCN_TO_TIMESTAMP(" + scn2 + ")" )) {
return rs.next();
}
}
/**
* Returns the SCN of the latest DDL change to the captured tables. The result will be empty if there's no table to
* capture as per the configuration.
*/
private Optional<Long> getLatestTableDdlScn(SnapshotContext ctx) throws SQLException {
if (ctx.capturedTables.isEmpty()) {
return Optional.empty();
}
StringBuilder lastDdlScnQuery = new StringBuilder("SELECT MAX(TIMESTAMP_TO_SCN(last_ddl_time))")
.append(" FROM all_objects")
.append(" WHERE");
for(TableId table : ctx.capturedTables) {
lastDdlScnQuery.append(" (owner = '" + table.schema() + "' AND object_name = '" + table.table() + "') OR");
}
String query = lastDdlScnQuery.substring(0, lastDdlScnQuery.length() - 3).toString();
try(Statement statement = jdbcConnection.connection().createStatement();
ResultSet rs = statement.executeQuery(query)) {
if (!rs.next()) {
throw new IllegalStateException("Couldn't get latest table DDL SCN");
}
return Optional.of(rs.getLong(1));
}
}
@ -130,6 +229,19 @@ protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext,
}
}
@Override
protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn");
return "SELECT * FROM " + tableId.schema() + "." + tableId.table() + " AS OF SCN " + snapshotOffset;
}
@Override
protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] row) {
// TODO can this be done in a better way than doing it as a side-effect here?
((OracleOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(clock.currentTimeInMillis()));
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, clock);
}
@Override
protected void complete() {
if (connectorConfig.getPdbName() != null) {
@ -142,6 +254,8 @@ protected void complete() {
*/
private static class OracleSnapshotContext extends SnapshotContext {
private Savepoint preSchemaSnapshotSavepoint;
public OracleSnapshotContext(String catalogName) throws SQLException {
super(catalogName);
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.util.Clock;
import oracle.streams.RowLCR;
/**
* Emits change data based on a single {@link RowLCR} event.
*
* @author Gunnar Morling
*/
public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter {
private final Object[] row;
public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) {
super(offset, clock);
this.row = row;
}
@Override
protected Operation getOperation() {
return Operation.READ;
}
@Override
protected Object[] getOldColumnValues() {
throw new UnsupportedOperationException("Can't get old row values for READ record");
}
@Override
protected Object[] getNewColumnValues() {
return row;
}
}

View File

@ -33,6 +33,7 @@ public class SourceInfo extends AbstractSourceInfo {
private long scn;
private String transactionId;
private Instant sourceTime;
private boolean snapshot;
protected SourceInfo(String serverName) {
super(Module.version());
@ -51,7 +52,7 @@ public Struct struct() {
.put(TIMESTAMP_KEY, sourceTime.toEpochMilli())
.put(TXID_KEY, transactionId)
.put(SCN_KEY, scn)
.put(SNAPSHOT_KEY, false);
.put(SNAPSHOT_KEY, snapshot);
}
public String getServerName() {
@ -81,4 +82,12 @@ public Instant getSourceTime() {
public void setSourceTime(Instant sourceTime) {
this.sourceTime = sourceTime;
}
public void setSnapshot(boolean snapshot) {
this.snapshot = snapshot;
}
public boolean isSnapshot() {
return snapshot;
}
}

View File

@ -18,12 +18,12 @@
*
* @author Gunnar Morling
*/
public class OracleChangeRecordEmitter extends RelationalChangeRecordEmitter {
public class XStreamChangeRecordEmitter extends RelationalChangeRecordEmitter {
private final RowLCR lcr;
private final Table table;
public OracleChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table, Clock clock) {
public XStreamChangeRecordEmitter(OffsetContext offset, RowLCR lcr, Table table, Clock clock) {
super(offset, clock);
this.lcr = lcr;

View File

@ -25,6 +25,7 @@
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing;
/**
@ -35,6 +36,8 @@
public class OracleConnectorIT extends AbstractConnectorTest {
private static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
private static OracleConnection connection;
@BeforeClass
@ -64,15 +67,124 @@ public static void closeConnection() throws SQLException {
}
@Before
public void before() {
public void before() throws SQLException {
connection.execute("delete from debezium.customer");
setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@Test
public void shouldTakeSnapshot() throws Exception {
Configuration config = TestHelper.defaultConfig()
.with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER")
.build();
int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))");
connection.execute("INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)");
connection.execute("COMMIT");
expectedRecordCount += 2;
start(OracleConnector.class, config);
assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(expectedRecordCount);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
assertThat(testTableRecords).hasSize(expectedRecordCount);
// read
SourceRecord record1 = testTableRecords.get(0);
VerifyRecord.isValidRead(record1);
Struct after = (Struct) ((Struct)record1.value()).get("after");
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1));
assertThat(after.get("NAME")).isEqualTo("Billie-Bob");
assertThat(after.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56));
assertThat(after.get("REGISTERED")).isEqualTo(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0)));
assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
Struct source = (Struct) ((Struct)record1.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
SourceRecord record2 = testTableRecords.get(1);
VerifyRecord.isValidRead(record2);
after = (Struct) ((Struct)record2.value()).get("after");
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2));
assertThat(after.get("NAME")).isEqualTo("Bruce");
assertThat(after.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67));
assertThat(after.get("REGISTERED")).isNull();
assertThat(record2.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
source = (Struct) ((Struct)record2.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
}
@Test
public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
Configuration config = TestHelper.defaultConfig()
.with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER")
.build();
int expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018/02/22', 'yyyy-mm-dd'))");
connection.execute("INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)");
connection.execute("COMMIT");
expectedRecordCount += 2;
start(OracleConnector.class, config);
assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(expectedRecordCount);
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
assertThat(testTableRecords).hasSize(expectedRecordCount);
// read
SourceRecord record1 = testTableRecords.get(0);
VerifyRecord.isValidRead(record1);
Struct after = (Struct) ((Struct)record1.value()).get("after");
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(1));
assertThat(record1.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
SourceRecord record2 = testTableRecords.get(1);
VerifyRecord.isValidRead(record2);
after = (Struct) ((Struct)record2.value()).get("after");
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(2));
assertThat(record2.sourceOffset().get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(true);
assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
expectedRecordCount = 0;
connection.execute("INSERT INTO debezium.customer VALUES (3, 'Brian', 2345.67, null)");
connection.execute("COMMIT");
expectedRecordCount += 1;
records = consumeRecordsByTopic(expectedRecordCount);
testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
assertThat(testTableRecords).hasSize(expectedRecordCount);
SourceRecord record3 = testTableRecords.get(0);
VerifyRecord.isValidInsert(record3);
after = (Struct) ((Struct)record3.value()).get("after");
assertThat(after.get("ID")).isEqualTo(BigDecimal.valueOf(3));
assertThat(record3.sourceOffset().containsKey(SourceInfo.SNAPSHOT_KEY)).isFalse();
assertThat(record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
Struct source = (Struct) ((Struct)record3.value()).get("source");
assertThat(source.get(SourceInfo.SNAPSHOT_KEY)).isEqualTo(false);
}
@Test
public void shouldReadChangeStreamForExistingTable() throws Exception {
Configuration config = TestHelper.defaultConfig().build();
Configuration config = TestHelper.defaultConfig()
.with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
@ -156,7 +268,9 @@ public void shouldReadChangeStreamForExistingTable() throws Exception {
public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception {
TestHelper.dropTable(connection, "debezium.customer2");
Configuration config = TestHelper.defaultConfig().build();
Configuration config = TestHelper.defaultConfig()
.with(RelationalDatabaseConnectorConfig.TABLE_WHITELIST, "ORCLPDB1\\.DEBEZIUM\\.CUSTOMER2")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();