DBZ-1235 Oracle converted to use source info maker

This commit is contained in:
Jiri Pechanec 2019-05-27 12:52:53 +02:00 committed by Gunnar Morling
parent e640f9fdd5
commit 2f42bd74d2
10 changed files with 128 additions and 62 deletions

View File

@ -68,6 +68,7 @@ public void processLCR(LCR lcr) throws StreamsException {
offsetContext.setLcrPosition(lcrPosition); offsetContext.setLcrPosition(lcrPosition);
offsetContext.setTransactionId(lcr.getTransactionId()); offsetContext.setTransactionId(lcr.getTransactionId());
offsetContext.setSourceTime(lcr.getSourceTime().timestampValue().toInstant()); offsetContext.setSourceTime(lcr.getSourceTime().timestampValue().toInstant());
offsetContext.setTableId(new TableId(lcr.getSourceDatabaseName(), lcr.getObjectOwner(), lcr.getObjectName()));
try { try {
if(lcr instanceof RowLCR) { if(lcr instanceof RowLCR) {

View File

@ -14,6 +14,8 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.document.Document; import io.debezium.document.Document;
import io.debezium.heartbeat.Heartbeat; import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
@ -341,4 +343,9 @@ public boolean isIncluded(TableId t) {
!t.schema().toLowerCase().equals("xdb"); !t.schema().toLowerCase().equals("xdb");
} }
} }
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
return new OracleSourceInfoStructMaker(Module.name(), Module.version(), this);
}
} }

View File

@ -70,7 +70,7 @@ public void start(Configuration config) {
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage(); this.schema.initializeStorage();
OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig.getLogicalName())); OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig));
if (previousOffset != null) { if (previousOffset != null) {
schema.recover(previousOffset); schema.recover(previousOffset);
} }

View File

@ -6,6 +6,7 @@
package io.debezium.connector.oracle; package io.debezium.connector.oracle;
import io.debezium.connector.oracle.antlr.OracleDdlParser; import io.debezium.connector.oracle.antlr.OracleDdlParser;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,8 +32,12 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, OracleConnection connection) { public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, OracleConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
new TableSchemaBuilder(new OracleValueConverters(connection), schemaNameAdjuster, SourceInfo.SCHEMA), new TableSchemaBuilder(
connectorConfig.getTablenameCaseInsensitive()); new OracleValueConverters(connection),
schemaNameAdjuster,
connectorConfig.getSourceInfoStructMaker().schema()),
connectorConfig.getTablenameCaseInsensitive()
);
} }
@Override @Override

View File

@ -14,6 +14,7 @@
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
public class OracleOffsetContext implements OffsetContext { public class OracleOffsetContext implements OffsetContext {
@ -30,10 +31,10 @@ public class OracleOffsetContext implements OffsetContext {
*/ */
private boolean snapshotCompleted; private boolean snapshotCompleted;
private OracleOffsetContext(String serverName, long scn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) { private OracleOffsetContext(OracleConnectorConfig connectorConfig, long scn, LcrPosition lcrPosition, boolean snapshot, boolean snapshotCompleted) {
partition = Collections.singletonMap(SERVER_PARTITION_KEY, serverName); partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
sourceInfo = new SourceInfo(serverName); sourceInfo = new SourceInfo(connectorConfig);
sourceInfo.setScn(scn); sourceInfo.setScn(scn);
sourceInfo.setLcrPosition(lcrPosition); sourceInfo.setLcrPosition(lcrPosition);
sourceInfoSchema = sourceInfo.schema(); sourceInfoSchema = sourceInfo.schema();
@ -49,14 +50,14 @@ private OracleOffsetContext(String serverName, long scn, LcrPosition lcrPosition
public static class Builder { public static class Builder {
private String logicalName; private OracleConnectorConfig connectorConfig;
private long scn; private long scn;
private LcrPosition lcrPosition; private LcrPosition lcrPosition;
private boolean snapshot; private boolean snapshot;
private boolean snapshotCompleted; private boolean snapshotCompleted;
public Builder logicalName(String logicalName) { public Builder logicalName(OracleConnectorConfig connectorConfig) {
this.logicalName = logicalName; this.connectorConfig = connectorConfig;
return this; return this;
} }
@ -81,7 +82,7 @@ public Builder snapshotCompleted(boolean snapshotCompleted) {
} }
OracleOffsetContext build() { OracleOffsetContext build() {
return new OracleOffsetContext(logicalName, scn, lcrPosition, snapshot, snapshotCompleted); return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted);
} }
} }
@ -147,6 +148,10 @@ public void setSourceTime(Instant instant) {
sourceInfo.setSourceTime(instant); sourceInfo.setSourceTime(instant);
} }
public void setTableId(TableId tableId) {
sourceInfo.setTableId(tableId);
}
@Override @Override
public boolean isSnapshotRunning() { public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && !snapshotCompleted; return sourceInfo.isSnapshot() && !snapshotCompleted;
@ -184,15 +189,15 @@ public String toString() {
public static class Loader implements OffsetContext.Loader { public static class Loader implements OffsetContext.Loader {
private final String logicalName; private final OracleConnectorConfig connectorConfig;
public Loader(String logicalName) { public Loader(OracleConnectorConfig connectorConfig) {
this.logicalName = logicalName; this.connectorConfig = connectorConfig;
} }
@Override @Override
public Map<String, ?> getPartition() { public Map<String, ?> getPartition() {
return Collections.singletonMap(SERVER_PARTITION_KEY, logicalName); return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
} }
@Override @Override
@ -202,7 +207,7 @@ public OffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY)); boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(SNAPSHOT_COMPLETED_KEY));
return new OracleOffsetContext(logicalName, scn, lcrPosition, snapshot, snapshotCompleted); return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshot, snapshotCompleted);
} }
} }
} }

View File

@ -121,7 +121,7 @@ protected void determineSnapshotOffset(SnapshotContext ctx) throws Exception {
while(areSameTimestamp(latestTableDdlScn.orElse(null), currentScn)); while(areSameTimestamp(latestTableDdlScn.orElse(null), currentScn));
ctx.offset = OracleOffsetContext.create() ctx.offset = OracleOffsetContext.create()
.logicalName(connectorConfig.getLogicalName()) .logicalName(connectorConfig)
.scn(currentScn) .scn(currentScn)
.build(); .build();
} }
@ -231,9 +231,10 @@ protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tabl
} }
@Override @Override
protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, Object[] row) { protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, TableId tableId, Object[] row) {
// TODO can this be done in a better way than doing it as a side-effect here? // TODO can this be done in a better way than doing it as a side-effect here?
((OracleOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(clock.currentTimeInMillis())); ((OracleOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(clock.currentTimeInMillis()));
((OracleOffsetContext) snapshotContext.offset).setTableId(tableId);
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, clock); return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, clock);
} }

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;
public class OracleSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {
private final Schema schema;
public OracleSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name("io.debezium.connector.oracle.Source")
.field(SourceInfo.SCHEMA_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.TXID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
@Override
public Schema schema() {
return schema;
}
@Override
public Struct struct(SourceInfo sourceInfo) {
final Struct ret = super.commonStruct(sourceInfo)
.put(SourceInfo.SCHEMA_NAME_KEY, sourceInfo.getTableId().schema())
.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.getTableId().table())
.put(SourceInfo.TXID_KEY, sourceInfo.getTransactionId())
.put(SourceInfo.SCN_KEY, sourceInfo.getScn());
if (sourceInfo.getLcrPosition() != null) {
ret.put(SourceInfo.LCR_POSITION_KEY, sourceInfo.getLcrPosition().toString());
}
return ret;
}
}

View File

@ -7,70 +7,36 @@
import java.time.Instant; import java.time.Instant;
import io.debezium.annotation.NotThreadSafe;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.relational.TableId;
@NotThreadSafe @NotThreadSafe
public class SourceInfo extends AbstractSourceInfo { public class SourceInfo extends AbstractSourceInfo {
public static final String SERVER_NAME_KEY = "name";
public static final String TXID_KEY = "txId"; public static final String TXID_KEY = "txId";
public static final String TIMESTAMP_KEY = "ts_ms";
public static final String SCN_KEY = "scn"; public static final String SCN_KEY = "scn";
public static final String LCR_POSITION_KEY = "lcr_position"; public static final String LCR_POSITION_KEY = "lcr_position";
public static final String SNAPSHOT_KEY = "snapshot"; public static final String SNAPSHOT_KEY = "snapshot";
public static final Schema SCHEMA = schemaBuilder()
.name("io.debezium.connector.oracle.Source")
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(TIMESTAMP_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(TXID_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SCN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(LCR_POSITION_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SNAPSHOT_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.build();
private final String serverName;
private long scn; private long scn;
private LcrPosition lcrPosition; private LcrPosition lcrPosition;
private String transactionId; private String transactionId;
private Instant sourceTime; private Instant sourceTime;
private boolean snapshot; private boolean snapshot;
private TableId tableId;
protected SourceInfo(String serverName) { protected SourceInfo(OracleConnectorConfig connectorConfig) {
super(Module.version()); super(connectorConfig);
this.serverName = serverName;
} }
@Override /**
protected Schema schema() { * @return the coordinates encoded as a {@code Struct}
return SCHEMA; */
}
@Override
protected String connector() {
return Module.name();
}
@Override
public Struct struct() { public Struct struct() {
final Struct r = super.struct() return structMaker().struct(this);
.put(SERVER_NAME_KEY, serverName)
.put(TIMESTAMP_KEY, sourceTime.toEpochMilli())
.put(TXID_KEY, transactionId)
.put(SCN_KEY, scn)
.put(SNAPSHOT_KEY, snapshot);
if (lcrPosition != null) {
r.put(LCR_POSITION_KEY, lcrPosition.toString());
}
return r;
}
public String getServerName() {
return serverName;
} }
public long getScn() { public long getScn() {
@ -112,4 +78,27 @@ public void setSnapshot(boolean snapshot) {
public boolean isSnapshot() { public boolean isSnapshot() {
return snapshot; return snapshot;
} }
public TableId getTableId() {
return tableId;
}
public void setTableId(TableId tableId) {
this.tableId = tableId;
}
@Override
protected Instant timestamp() {
return sourceTime;
}
@Override
protected boolean snapshot() {
return isSnapshot();
}
@Override
protected String database() {
return tableId.catalog();
}
} }

View File

@ -8,6 +8,9 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.relational.TableId;
import java.time.Instant; import java.time.Instant;
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
@ -18,8 +21,15 @@ public class SourceInfoTest {
@Before @Before
public void beforeEach() { public void beforeEach() {
source = new SourceInfo("serverX"); final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(
Configuration.create()
.with(OracleConnectorConfig.SERVER_NAME, "serverX")
.with(OracleConnectorConfig.DATABASE_NAME, "mydb")
.build()
);
source = new SourceInfo(connectorConfig);
source.setSourceTime(Instant.now()); source.setSourceTime(Instant.now());
source.setTableId(new TableId("c", "s", "t"));
} }
@Test @Test

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId> <artifactId>debezium-parent</artifactId>
<version>0.9.5.Final</version> <version>0.10.0-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>