DBZ-5136 reintroduce the CommonOffsetContext class and refactor common functionality into it

This commit is contained in:
Mark Bereznitsky 2022-06-01 20:48:59 +10:00 committed by Jiri Pechanec
parent 2d72dae649
commit c64b8af31a
7 changed files with 79 additions and 135 deletions

View File

@ -12,12 +12,11 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.bson.BsonDocument;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId;
@ -26,7 +25,7 @@
*
* @author Chris Cranford
*/
public class MongoDbOffsetContext implements OffsetContext {
public class MongoDbOffsetContext extends CommonOffsetContext {
private final SourceInfo sourceInfo;
private final TransactionContext transactionContext;
@ -55,6 +54,11 @@ void stopReplicaSetSnapshot(String replicaSetName) {
sourceInfo.stopInitialSync(replicaSetName);
}
@Override
public SourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public Map<String, ?> getOffset() {
// Any common framework API that needs to call this function should be provided with a ReplicaSetOffsetContext
@ -66,11 +70,6 @@ public Schema getSourceInfoSchema() {
return sourceInfo.schema();
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && sourceInfo.isSnapshotRunning();
@ -85,26 +84,11 @@ public void preSnapshotStart() {
public void preSnapshotCompletion() {
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -9,14 +9,13 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
@ -32,7 +31,7 @@
* @author Chris Cranford
*/
@ThreadSafe
public class ReplicaSetOffsetContext implements OffsetContext {
public class ReplicaSetOffsetContext extends CommonOffsetContext {
private final MongoDbOffsetContext offsetContext;
private final String replicaSetName;
@ -47,6 +46,11 @@ public ReplicaSetOffsetContext(MongoDbOffsetContext offsetContext, ReplicaSet re
this.incrementalSnapshotContext = incrementalSnapshotContext;
}
@Override
public SourceInfo getSourceInfoObject() {
return offsetContext.getSourceInfoObject();
}
@Override
public Map<String, ?> getOffset() {
@SuppressWarnings("unchecked")
@ -60,21 +64,11 @@ public Schema getSourceInfoSchema() {
return offsetContext.getSourceInfoSchema();
}
@Override
public Struct getSourceInfo() {
return offsetContext.getSourceInfo();
}
@Override
public boolean isSnapshotRunning() {
return offsetContext.isSnapshotRunning();
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
offsetContext.markSnapshotRecord(record);
}
@Override
public void preSnapshotStart() {
offsetContext.preSnapshotStart();
@ -85,11 +79,6 @@ public void preSnapshotCompletion() {
offsetContext.preSnapshotCompletion();
}
@Override
public void postSnapshotCompletion() {
offsetContext.postSnapshotCompletion();
}
@Override
public void event(DataCollectionId collectionId, Instant timestamp) {
// Not used by the ReplicaSetOffsetContext, see readEvent and oplogEvent
@ -130,11 +119,6 @@ public String lastResumeToken() {
return sourceInfo.lastResumeToken(replicaSetName);
}
@Override
public void incrementalSnapshotEvents() {
offsetContext.incrementalSnapshotEvents();
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return offsetContext.getIncrementalSnapshotContext();

View File

@ -11,10 +11,10 @@
import java.util.Set;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -22,7 +22,7 @@
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
public class MySqlOffsetContext implements OffsetContext {
public class MySqlOffsetContext extends CommonOffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
@ -67,6 +67,11 @@ public MySqlOffsetContext(MySqlConnectorConfig connectorConfig, boolean snapshot
sourceInfo);
}
@Override
public SourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public Map<String, ?> getOffset() {
final Map<String, Object> offset = offsetUsingPosition(restartRowsToSkip);
@ -109,11 +114,6 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && !snapshotCompleted;
@ -134,11 +134,6 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
private void setTransactionId() {
// use GTID if it is available
if (sourceInfo.getCurrentGtid() != null) {
@ -221,11 +216,6 @@ private long longOffsetValue(Map<String, ?> values, String key) {
}
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
@ -249,11 +239,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -13,16 +13,15 @@
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
public class OracleOffsetContext implements OffsetContext {
public class OracleOffsetContext extends CommonOffsetContext {
public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx";
@ -156,6 +155,11 @@ public static Builder create() {
return new Builder();
}
@Override
public SourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public Map<String, ?> getOffset() {
if (sourceInfo.isSnapshot()) {
@ -204,11 +208,6 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
public void setScn(Scn scn) {
sourceInfo.setScn(scn);
}
@ -273,11 +272,6 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn());
@ -294,11 +288,6 @@ public String toString() {
return sb.toString();
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.tableEvent((TableId) tableId);
@ -320,11 +309,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -11,7 +11,6 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,6 +19,7 @@
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -29,7 +29,7 @@
import io.debezium.time.Conversions;
import io.debezium.util.Clock;
public class PostgresOffsetContext implements OffsetContext {
public class PostgresOffsetContext extends CommonOffsetContext {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);
public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
@ -67,6 +67,11 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn,
this.incrementalSnapshotContext = incrementalSnapshotContext;
}
@Override
public SourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public Map<String, ?> getOffset() {
Map<String, Object> result = new HashMap<>();
@ -100,11 +105,6 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot();
@ -121,11 +121,6 @@ public void preSnapshotCompletion() {
lastSnapshotRecord = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
sourceInfo.update(lsn, commitTime, txId, xmin, tableId);
@ -258,11 +253,6 @@ public OffsetState asOffsetState() {
sourceInfo.isSnapshot());
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant instant) {
sourceInfo.update(instant, (TableId) tableId);
@ -273,11 +263,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -9,9 +9,9 @@
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -20,7 +20,7 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Collect;
public class SqlServerOffsetContext implements OffsetContext {
public class SqlServerOffsetContext extends CommonOffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
@ -60,6 +60,11 @@ public SqlServerOffsetContext(SqlServerConnectorConfig connectorConfig, TxLogPos
this(connectorConfig, position, snapshot, snapshotCompleted, 1, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>());
}
@Override
public SourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public Map<String, ?> getOffset() {
if (sourceInfo.isSnapshot()) {
@ -82,11 +87,6 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
public TxLogPosition getChangePosition() {
return TxLogPosition.valueOf(sourceInfo.getCommitLsn(), sourceInfo.getChangeLsn());
}
@ -127,11 +127,6 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
public static class Loader implements OffsetContext.Loader<SqlServerOffsetContext> {
private final SqlServerConnectorConfig connectorConfig;
@ -168,11 +163,6 @@ public String toString() {
"]";
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
@ -184,11 +174,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -0,0 +1,37 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline;
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.spi.OffsetContext;
public abstract class CommonOffsetContext implements OffsetContext {
public abstract BaseSourceInfo getSourceInfoObject();
@Override
public Struct getSourceInfo() {
return getSourceInfoObject().struct();
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
getSourceInfoObject().setSnapshot(record);
}
@Override
public void postSnapshotCompletion() {
getSourceInfoObject().setSnapshot(SnapshotRecord.FALSE);
}
@Override
public void incrementalSnapshotEvents() {
getSourceInfoObject().setSnapshot(SnapshotRecord.INCREMENTAL);
}
}