diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java index 8eaad16e1..b71df4935 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java @@ -12,12 +12,11 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.schema.DataCollectionId; @@ -26,7 +25,7 @@ * * @author Chris Cranford */ -public class MongoDbOffsetContext implements OffsetContext { +public class MongoDbOffsetContext extends CommonOffsetContext { private final SourceInfo sourceInfo; private final TransactionContext transactionContext; @@ -55,6 +54,11 @@ void stopReplicaSetSnapshot(String replicaSetName) { sourceInfo.stopInitialSync(replicaSetName); } + @Override + public SourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public Map getOffset() { // Any common framework API that needs to call this function should be provided with a ReplicaSetOffsetContext @@ -66,11 +70,6 @@ public Schema getSourceInfoSchema() { return sourceInfo.schema(); } - @Override - public Struct getSourceInfo() { - return sourceInfo.struct(); - } - @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && sourceInfo.isSnapshotRunning(); @@ -85,26 +84,11 @@ public void preSnapshotStart() { public void preSnapshotCompletion() { } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - - @Override - public void markSnapshotRecord(SnapshotRecord record) { - sourceInfo.setSnapshot(record); - } - @Override public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java index 9447f4922..120fe2fa9 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java @@ -9,14 +9,13 @@ import java.util.Map; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; import org.bson.BsonTimestamp; import com.mongodb.client.model.changestream.ChangeStreamDocument; import io.debezium.annotation.ThreadSafe; -import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -32,7 +31,7 @@ * @author Chris Cranford */ @ThreadSafe -public class ReplicaSetOffsetContext implements OffsetContext { +public class ReplicaSetOffsetContext extends CommonOffsetContext { private final MongoDbOffsetContext offsetContext; private final String replicaSetName; @@ -47,6 +46,11 @@ public ReplicaSetOffsetContext(MongoDbOffsetContext offsetContext, ReplicaSet re this.incrementalSnapshotContext = incrementalSnapshotContext; } + @Override + public SourceInfo getSourceInfoObject() { + return offsetContext.getSourceInfoObject(); + } + @Override public Map getOffset() { @SuppressWarnings("unchecked") @@ -60,21 +64,11 @@ public Schema getSourceInfoSchema() { return offsetContext.getSourceInfoSchema(); } - @Override - public Struct getSourceInfo() { - return offsetContext.getSourceInfo(); - } - @Override public boolean isSnapshotRunning() { return offsetContext.isSnapshotRunning(); } - @Override - public void markSnapshotRecord(SnapshotRecord record) { - offsetContext.markSnapshotRecord(record); - } - @Override public void preSnapshotStart() { offsetContext.preSnapshotStart(); @@ -85,11 +79,6 @@ public void preSnapshotCompletion() { offsetContext.preSnapshotCompletion(); } - @Override - public void postSnapshotCompletion() { - offsetContext.postSnapshotCompletion(); - } - @Override public void event(DataCollectionId collectionId, Instant timestamp) { // Not used by the ReplicaSetOffsetContext, see readEvent and oplogEvent @@ -130,11 +119,6 @@ public String lastResumeToken() { return sourceInfo.lastResumeToken(replicaSetName); } - @Override - public void incrementalSnapshotEvents() { - offsetContext.incrementalSnapshotEvents(); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return offsetContext.getIncrementalSnapshotContext(); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java index 01f9b60e6..f81a6cbf8 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java @@ -11,10 +11,10 @@ import java.util.Set; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -22,7 +22,7 @@ import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class MySqlOffsetContext implements OffsetContext { +public class MySqlOffsetContext extends CommonOffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event"; @@ -67,6 +67,11 @@ public MySqlOffsetContext(MySqlConnectorConfig connectorConfig, boolean snapshot sourceInfo); } + @Override + public SourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public Map getOffset() { final Map offset = offsetUsingPosition(restartRowsToSkip); @@ -109,11 +114,6 @@ public Schema getSourceInfoSchema() { return sourceInfoSchema; } - @Override - public Struct getSourceInfo() { - return sourceInfo.struct(); - } - @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && !snapshotCompleted; @@ -134,11 +134,6 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - private void setTransactionId() { // use GTID if it is available if (sourceInfo.getCurrentGtid() != null) { @@ -221,11 +216,6 @@ private long longOffsetValue(Map values, String key) { } } - @Override - public void markSnapshotRecord(SnapshotRecord record) { - sourceInfo.setSnapshot(record); - } - @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.setSourceTime(timestamp); @@ -249,11 +239,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index a1c14effc..01bb6914e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -13,16 +13,15 @@ import java.util.stream.Collectors; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class OracleOffsetContext implements OffsetContext { +public class OracleOffsetContext extends CommonOffsetContext { public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx"; @@ -156,6 +155,11 @@ public static Builder create() { return new Builder(); } + @Override + public SourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public Map getOffset() { if (sourceInfo.isSnapshot()) { @@ -204,11 +208,6 @@ public Schema getSourceInfoSchema() { return sourceInfoSchema; } - @Override - public Struct getSourceInfo() { - return sourceInfo.struct(); - } - public void setScn(Scn scn) { sourceInfo.setScn(scn); } @@ -273,11 +272,6 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - @Override public String toString() { StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn()); @@ -294,11 +288,6 @@ public String toString() { return sb.toString(); } - @Override - public void markSnapshotRecord(SnapshotRecord record) { - sourceInfo.setSnapshot(record); - } - @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.tableEvent((TableId) tableId); @@ -320,11 +309,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 16394d9b0..b38831622 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -11,7 +11,6 @@ import java.util.Map; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +19,7 @@ import io.debezium.connector.postgresql.connection.Lsn; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.spi.OffsetState; +import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -29,7 +29,7 @@ import io.debezium.time.Conversions; import io.debezium.util.Clock; -public class PostgresOffsetContext implements OffsetContext { +public class PostgresOffsetContext extends CommonOffsetContext { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class); public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; @@ -67,6 +67,11 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, this.incrementalSnapshotContext = incrementalSnapshotContext; } + @Override + public SourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public Map getOffset() { Map result = new HashMap<>(); @@ -100,11 +105,6 @@ public Schema getSourceInfoSchema() { return sourceInfoSchema; } - @Override - public Struct getSourceInfo() { - return sourceInfo.struct(); - } - @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot(); @@ -121,11 +121,6 @@ public void preSnapshotCompletion() { lastSnapshotRecord = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId) { this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; sourceInfo.update(lsn, commitTime, txId, xmin, tableId); @@ -258,11 +253,6 @@ public OffsetState asOffsetState() { sourceInfo.isSnapshot()); } - @Override - public void markSnapshotRecord(SnapshotRecord record) { - sourceInfo.setSnapshot(record); - } - @Override public void event(DataCollectionId tableId, Instant instant) { sourceInfo.update(instant, (TableId) tableId); @@ -273,11 +263,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index f397a9a5c..7ddcf8c67 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -9,9 +9,9 @@ import java.util.Map; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; import io.debezium.connector.SnapshotRecord; +import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -20,7 +20,7 @@ import io.debezium.schema.DataCollectionId; import io.debezium.util.Collect; -public class SqlServerOffsetContext implements OffsetContext { +public class SqlServerOffsetContext extends CommonOffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; @@ -60,6 +60,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>()); } + @Override + public SourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public Map getOffset() { if (sourceInfo.isSnapshot()) { @@ -82,11 +87,6 @@ public Schema getSourceInfoSchema() { return sourceInfoSchema; } - @Override - public Struct getSourceInfo() { - return sourceInfo.struct(); - } - public TxLogPosition getChangePosition() { return TxLogPosition.valueOf(sourceInfo.getCommitLsn(), sourceInfo.getChangeLsn()); } @@ -127,11 +127,6 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - public static class Loader implements OffsetContext.Loader { private final SqlServerConnectorConfig connectorConfig; @@ -168,11 +163,6 @@ public String toString() { "]"; } - @Override - public void markSnapshotRecord(SnapshotRecord record) { - sourceInfo.setSnapshot(record); - } - @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.setSourceTime(timestamp); @@ -184,11 +174,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-core/src/main/java/io/debezium/pipeline/CommonOffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/CommonOffsetContext.java new file mode 100644 index 000000000..cd7782a32 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/CommonOffsetContext.java @@ -0,0 +1,37 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline; + +import org.apache.kafka.connect.data.Struct; + +import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.common.BaseSourceInfo; +import io.debezium.pipeline.spi.OffsetContext; + +public abstract class CommonOffsetContext implements OffsetContext { + + public abstract BaseSourceInfo getSourceInfoObject(); + + @Override + public Struct getSourceInfo() { + return getSourceInfoObject().struct(); + } + + @Override + public void markSnapshotRecord(SnapshotRecord record) { + getSourceInfoObject().setSnapshot(record); + } + + @Override + public void postSnapshotCompletion() { + getSourceInfoObject().setSnapshot(SnapshotRecord.FALSE); + } + + @Override + public void incrementalSnapshotEvents() { + getSourceInfoObject().setSnapshot(SnapshotRecord.INCREMENTAL); + } +}