DBZ-5136: Convert RelationalOffsetContext to CommonOffsetContext

This commit is contained in:
Andrew Walker 2022-05-26 11:32:40 +10:00 committed by Jiri Pechanec
parent 01a81f8bc1
commit d0759461a4
8 changed files with 24 additions and 46 deletions

View File

@ -11,15 +11,15 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import io.debezium.connector.common.BaseSourceInfo;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.bson.BsonDocument; import org.bson.BsonDocument;
import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.CommonOffsetContext;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
/** /**
@ -27,7 +27,7 @@
* *
* @author Chris Cranford * @author Chris Cranford
*/ */
public class MongoDbOffsetContext implements OffsetContext { public class MongoDbOffsetContext extends CommonOffsetContext {
private final SourceInfo sourceInfo; private final SourceInfo sourceInfo;
private final TransactionContext transactionContext; private final TransactionContext transactionContext;
@ -95,31 +95,6 @@ public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE); sourceInfo.setSnapshot(SnapshotRecord.FALSE);
} }
@Override
public void markSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
}
@Override
public void markFirstRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION);
}
@Override
public void markFirstSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.FIRST);
}
@Override
public void markLastRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
@Override
public void markLastSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.LAST);
}
@Override @Override
public TransactionContext getTransactionContext() { public TransactionContext getTransactionContext() {
return transactionContext; return transactionContext;

View File

@ -8,7 +8,6 @@
import java.time.Instant; import java.time.Instant;
import java.util.Map; import java.util.Map;
import io.debezium.connector.common.BaseSourceInfo;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.bson.BsonDocument; import org.bson.BsonDocument;
@ -17,9 +16,11 @@
import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.annotation.ThreadSafe; import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.CommonOffsetContext;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
/** /**
@ -32,7 +33,7 @@
* @author Chris Cranford * @author Chris Cranford
*/ */
@ThreadSafe @ThreadSafe
public class ReplicaSetOffsetContext implements OffsetContext { public class ReplicaSetOffsetContext extends CommonOffsetContext {
private final MongoDbOffsetContext offsetContext; private final MongoDbOffsetContext offsetContext;
private final String replicaSetName; private final String replicaSetName;

View File

@ -14,17 +14,17 @@
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalOffsetContext; import io.debezium.relational.CommonOffsetContext;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
public class MySqlOffsetContext extends RelationalOffsetContext { public class MySqlOffsetContext extends CommonOffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event"; public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";

View File

@ -17,13 +17,12 @@
import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalOffsetContext; import io.debezium.relational.CommonOffsetContext;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
public class OracleOffsetContext extends RelationalOffsetContext { public class OracleOffsetContext extends CommonOffsetContext {
public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx"; public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx";

View File

@ -25,13 +25,13 @@
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalOffsetContext; import io.debezium.relational.CommonOffsetContext;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
import io.debezium.time.Conversions; import io.debezium.time.Conversions;
import io.debezium.util.Clock; import io.debezium.util.Clock;
public class PostgresOffsetContext extends RelationalOffsetContext { public class PostgresOffsetContext extends CommonOffsetContext {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class); private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);
public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";

View File

@ -8,21 +8,21 @@
import java.time.Instant; import java.time.Instant;
import java.util.Map; import java.util.Map;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.relational.RelationalOffsetContext;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord; import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.CommonOffsetContext;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
import io.debezium.util.Collect; import io.debezium.util.Collect;
public class SqlServerOffsetContext extends RelationalOffsetContext { public class SqlServerOffsetContext extends CommonOffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";

View File

@ -9,7 +9,7 @@
import io.debezium.connector.common.BaseSourceInfo; import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
public abstract class RelationalOffsetContext implements OffsetContext { public abstract class CommonOffsetContext implements OffsetContext {
public abstract BaseSourceInfo getSourceInfoObject(); public abstract BaseSourceInfo getSourceInfoObject();

View File

@ -52,7 +52,7 @@
* *
* @author Gunnar Morling * @author Gunnar Morling
*/ */
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends RelationalOffsetContext> extends AbstractSnapshotChangeEventSource<P, O> { public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends CommonOffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class); private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);
@ -408,13 +408,16 @@ private void setSnapshotMarker(RelationalSnapshotContext<P, O> snapshotContext)
else { else {
snapshotContext.offset.markLastRecordInDataCollection(); snapshotContext.offset.markLastRecordInDataCollection();
} }
} else if (snapshotContext.firstRecordInTable) { }
else if (snapshotContext.firstRecordInTable) {
if (snapshotContext.firstTable) { if (snapshotContext.firstTable) {
snapshotContext.offset.markFirstSnapshotRecord(); snapshotContext.offset.markFirstSnapshotRecord();
} else { }
else {
snapshotContext.offset.markFirstRecordInDataCollection(); snapshotContext.offset.markFirstRecordInDataCollection();
} }
} else { }
else {
snapshotContext.offset.markSnapshotRecord(); snapshotContext.offset.markSnapshotRecord();
} }
} }