diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java index 4a3e61a51..cf292fce3 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/LcrEventHandler.java @@ -68,6 +68,7 @@ public void processLCR(LCR lcr) throws StreamsException { offsetContext.setLcrPosition(lcrPosition); offsetContext.setTransactionId(lcr.getTransactionId()); offsetContext.setSourceTime(lcr.getSourceTime().timestampValue().toInstant()); + offsetContext.setTableId(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName())); try { if(lcr instanceof RowLCR) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 625c6fb11..52eb78b25 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -14,6 +14,8 @@ import io.debezium.config.Configuration; import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.connector.SourceInfoStructMaker; import io.debezium.document.Document; import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcConfiguration; @@ -341,4 +343,9 @@ public boolean isIncluded(TableId t) { !t.schema().toLowerCase().equals("xdb"); } } + + @Override + protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { + return new OracleSourceInfoStructMaker(Module.name(), Module.version(), this); + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index 16c1609b5..0f419cef9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -70,7 +70,7 @@ public void start(Configuration config) { this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); this.schema.initializeStorage(); - OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig.getLogicalName())); + OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig)); if (previousOffset != null) { schema.recover(previousOffset); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java index a013dd2f3..05ffe6c15 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleDatabaseSchema.java @@ -6,6 +6,7 @@ package io.debezium.connector.oracle; import io.debezium.connector.oracle.antlr.OracleDdlParser; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +32,12 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema { public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, OracleConnection connection) { super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, - new TableSchemaBuilder(new OracleValueConverters(connection), schemaNameAdjuster, SourceInfo.SCHEMA), - connectorConfig.getTablenameCaseInsensitive()); + new TableSchemaBuilder( + new OracleValueConverters(connection), + schemaNameAdjuster, + connectorConfig.getSourceInfoStructMaker().schema()), + connectorConfig.getTablenameCaseInsensitive() + ); } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 22aee8263..bca098338 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -14,6 +14,7 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.relational.TableId; public class OracleOffsetContext implements OffsetContext { @@ -30,10 +31,10 @@ public class OracleOffsetContext implements OffsetContext { */ private boolean snapshotCompleted; - private OracleOffsetContext(String serverName, long scn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) { - partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName); + private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) { + partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); - sourceInfo = new SourceInfo(serverName); + sourceInfo = new SourceInfo(connectorConfig); sourceInfo.setScn(scn); sourceInfo.setLcrPosition(lcrPosition); sourceInfoSchema = sourceInfo.schema(); @@ -49,14 +50,14 @@ private OracleOffsetContext(String serverName, long scn, LcrPosition lcrPosition public static class Builder { - private String logicalName; + private OracleConnectorConfig connectorConfig; private long scn; private LcrPosition lcrPosition; private boolean snapshot; private boolean snapshotCompleted; - public Builder logicalName(String logicalName) { - this.logicalName = logicalName; + public Builder logicalName(OracleConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; return this; } @@ -81,7 +82,7 @@ public Builder snapshotCompleted(boolean snapshotCompleted) { } OracleOffsetContext build() { - return new OracleOffsetContext(logicalName, scn, lcrPosition, snapshot, snapshotCompleted); + return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted); } } @@ -147,6 +148,10 @@ public void setSourceTime(Instant instant) { sourceInfo.setSourceTime(instant); } + public void setTableId(TableId tableId) { + sourceInfo.setTableId(tableId); + } + @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && !snapshotCompleted; @@ -184,15 +189,15 @@ public String toString() { public static class Loader implements OffsetContext.Loader { - private final String logicalName; + private final OracleConnectorConfig connectorConfig; - public Loader(String logicalName) { - this.logicalName = logicalName; + public Loader(OracleConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; } @Override public Map getPartition() { - return Collections.singletonMap(SERVER_PARTITION_KEY, logicalName); + return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); } @Override @@ -202,7 +207,7 @@ public OffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); - return new OracleOffsetContext(logicalName, scn, lcrPosition, snapshot, snapshotCompleted); + return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted); } } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index cd443e3e8..1f7e1a29c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -121,7 +121,7 @@ protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception { while(areSameTimestamp(latestTableDdlScn.orElse(null), currentScn)); ctx.offset = OracleOffsetContext.create() - .logicalName(connectorConfig.getLogicalName()) + .logicalName(connectorConfig) .scn(currentScn) .build(); } @@ -231,9 +231,10 @@ protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tabl } @Override - protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] row) { + protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, TableId tableId, Object[] row) { // TODO can this be done in a better way than doing it as a side-effect here? ((OracleOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(clock.currentTimeInMillis())); + ((OracleOffsetContext) snapshotContext.offset).setTableId(tableId); return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, clock); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java new file mode 100644 index 000000000..5ece61a9c --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSourceInfoStructMaker.java @@ -0,0 +1,48 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.connector.AbstractSourceInfoStructMaker; + +public class OracleSourceInfoStructMaker extends AbstractSourceInfoStructMaker { + + private final Schema schema; + + public OracleSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) { + super(connector, version, connectorConfig); + schema = commonSchemaBuilder() + .name("io.debezium.connector.oracle.Source") + .field(SourceInfo.SCHEMA_NAME_KEY, Schema.STRING_SCHEMA) + .field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA) + .field(SourceInfo.TXID_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .field(SourceInfo.SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA) + .field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + @Override + public Schema schema() { + return schema; + } + + @Override + public Struct struct(SourceInfo sourceInfo) { + final Struct ret = super.commonStruct(sourceInfo) + .put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema()) + .put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table()) + .put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId()) + .put(SourceInfo.SCN_KEY, sourceInfo.getScn()); + + if (sourceInfo.getLcrPosition() != null) { + ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString()); + } + return ret; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java index bc49f50ed..829bc4889 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java @@ -7,70 +7,36 @@ import java.time.Instant; -import io.debezium.annotation.NotThreadSafe; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import io.debezium.annotation.NotThreadSafe; import io.debezium.connector.AbstractSourceInfo; +import io.debezium.relational.TableId; @NotThreadSafe public class SourceInfo extends AbstractSourceInfo { - public static final String SERVER_NAME_KEY = "name"; public static final String TXID_KEY = "txId"; - public static final String TIMESTAMP_KEY = "ts_ms"; public static final String SCN_KEY = "scn"; public static final String LCR_POSITION_KEY = "lcr_position"; public static final String SNAPSHOT_KEY = "snapshot"; - public static final Schema SCHEMA = schemaBuilder() - .name("io.debezium.connector.oracle.Source") - .field(SERVER_NAME_KEY, Schema.STRING_SCHEMA) - .field(TIMESTAMP_KEY, Schema.OPTIONAL_INT64_SCHEMA) - .field(TXID_KEY, Schema.OPTIONAL_STRING_SCHEMA) - .field(SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA) - .field(LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA) - .field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA) - .build(); - - private final String serverName; private long scn; private LcrPosition lcrPosition; private String transactionId; private Instant sourceTime; private boolean snapshot; + private TableId tableId; - protected SourceInfo(String serverName) { - super(Module.version()); - this.serverName = serverName; + protected SourceInfo(OracleConnectorConfig connectorConfig) { + super(connectorConfig); } - @Override - protected Schema schema() { - return SCHEMA; - } - - @Override - protected String connector() { - return Module.name(); - } - - @Override + /** + * @return the coordinates encoded as a {@code Struct} + */ public Struct struct() { - final Struct r = super.struct() - .put(SERVER_NAME_KEY, serverName) - .put(TIMESTAMP_KEY, sourceTime.toEpochMilli()) - .put(TXID_KEY, transactionId) - .put(SCN_KEY, scn) - .put(SNAPSHOT_KEY, snapshot); - if (lcrPosition != null) { - r.put(LCR_POSITION_KEY, lcrPosition.toString()); - } - return r; - } - - public String getServerName() { - return serverName; + return structMaker().struct(this); } public long getScn() { @@ -112,4 +78,27 @@ public void setSnapshot(boolean snapshot) { public boolean isSnapshot() { return snapshot; } + + public TableId getTableId() { + return tableId; + } + + public void setTableId(TableId tableId) { + this.tableId = tableId; + } + + @Override + protected Instant timestamp() { + return sourceTime; + } + + @Override + protected boolean snapshot() { + return isSnapshot(); + } + + @Override + protected String database() { + return tableId.catalog(); + } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java index 761b27b83..e957d70c9 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java @@ -8,6 +8,9 @@ import org.junit.Before; import org.junit.Test; +import io.debezium.config.Configuration; +import io.debezium.relational.TableId; + import java.time.Instant; import static org.fest.assertions.Assertions.assertThat; @@ -18,8 +21,15 @@ public class SourceInfoTest { @Before public void beforeEach() { - source = new SourceInfo("serverX"); + final OracleConnectorConfig connectorConfig = new OracleConnectorConfig( + Configuration.create() + .with(OracleConnectorConfig.SERVER_NAME, "serverX") + .with(OracleConnectorConfig.DATABASE_NAME, "mydb") + .build() + ); + source = new SourceInfo(connectorConfig); source.setSourceTime(Instant.now()); + source.setTableId(new TableId("c", "s", "t")); } @Test diff --git a/pom.xml b/pom.xml index b16490b3d..48914bc6a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ io.debezium debezium-parent - 0.9.5.Final + 0.10.0-SNAPSHOT 4.0.0