DBZ-777 Snapshot event metadata set in common snapshot source

This commit is contained in:
Jiri Pechanec 2019-08-15 06:53:55 +02:00 committed by Gunnar Morling
parent e7348c9c3b
commit 796c0933b0
3 changed files with 11 additions and 17 deletions

View File

@ -193,6 +193,12 @@ public void markLastSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.LAST);
}
@Override
public void event(TableId tableId, Instant timestamp) {
sourceInfo.setTableId(tableId);
sourceInfo.setSourceTime(timestamp);
}
public static class Loader implements OffsetContext.Loader {
private final OracleConnectorConfig connectorConfig;

View File

@ -10,7 +10,6 @@
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -21,9 +20,8 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.HistorizedRelationalSnapshotChangeEventSource;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
@ -35,20 +33,18 @@
*
* @author Gunnar Morling
*/
public class OracleSnapshotChangeEventSource extends HistorizedRelationalSnapshotChangeEventSource {
public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotChangeEventSource.class);
private final OracleConnectorConfig connectorConfig;
private final OracleConnection jdbcConnection;
private final Clock clock;
public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, OracleOffsetContext previousOffset, OracleConnection jdbcConnection, OracleDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) {
super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener);
this.connectorConfig = connectorConfig;
this.jdbcConnection = jdbcConnection;
this.clock = clock;
}
@Override
@ -225,17 +221,9 @@ protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext,
}
@Override
protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
protected Optional<String> getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn");
return "SELECT * FROM " + tableId.schema() + "." + tableId.table() + " AS OF SCN " + snapshotOffset;
}
@Override
protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, TableId tableId, Object[] row) {
// TODO can this be done in a better way than doing it as a side-effect here?
((OracleOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(clock.currentTimeInMillis()));
((OracleOffsetContext) snapshotContext.offset).setTableId(tableId);
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, clock);
return Optional.of("SELECT * FROM " + tableId.schema() + "." + tableId.table() + " AS OF SCN " + snapshotOffset);
}
@Override

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>0.10.0.Beta3</version>
<version>0.10.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>