From d0759461a482f59aea7cf730bf12a41b06654eee Mon Sep 17 00:00:00 2001 From: Andrew Walker Date: Thu, 26 May 2022 11:32:40 +1000 Subject: [PATCH] DBZ-5136: Convert RelationalOffsetContext to CommonOffsetContext --- .../mongodb/MongoDbOffsetContext.java | 31 ++----------------- .../mongodb/ReplicaSetOffsetContext.java | 5 +-- .../connector/mysql/MySqlOffsetContext.java | 6 ++-- .../connector/oracle/OracleOffsetContext.java | 5 ++- .../postgresql/PostgresOffsetContext.java | 4 +-- .../sqlserver/SqlServerOffsetContext.java | 6 ++-- ...tContext.java => CommonOffsetContext.java} | 2 +- .../RelationalSnapshotChangeEventSource.java | 11 ++++--- 8 files changed, 24 insertions(+), 46 deletions(-) rename debezium-core/src/main/java/io/debezium/relational/{RelationalOffsetContext.java => CommonOffsetContext.java} (94%) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java index 471562ee4..ffd5bcfe8 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java @@ -11,15 +11,15 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import io.debezium.connector.common.BaseSourceInfo; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.common.BaseSourceInfo; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.CommonOffsetContext; import io.debezium.schema.DataCollectionId; /** @@ -27,7 +27,7 @@ * * @author Chris Cranford */ -public class MongoDbOffsetContext implements OffsetContext { +public class MongoDbOffsetContext extends CommonOffsetContext { private final SourceInfo sourceInfo; private final TransactionContext transactionContext; @@ -95,31 +95,6 @@ public void postSnapshotCompletion() { sourceInfo.setSnapshot(SnapshotRecord.FALSE); } - @Override - public void markSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.TRUE); - } - - @Override - public void markFirstRecordInDataCollection() { - sourceInfo.setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION); - } - - @Override - public void markFirstSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.FIRST); - } - - @Override - public void markLastRecordInDataCollection() { - sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); - } - - @Override - public void markLastSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.LAST); - } - @Override public TransactionContext getTransactionContext() { return transactionContext; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java index db026457d..7f11f8f7b 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java @@ -8,7 +8,6 @@ import java.time.Instant; import java.util.Map; -import io.debezium.connector.common.BaseSourceInfo; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; @@ -17,9 +16,11 @@ import com.mongodb.client.model.changestream.ChangeStreamDocument; import io.debezium.annotation.ThreadSafe; +import io.debezium.connector.common.BaseSourceInfo; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.CommonOffsetContext; import io.debezium.schema.DataCollectionId; /** @@ -32,7 +33,7 @@ * @author Chris Cranford */ @ThreadSafe -public class ReplicaSetOffsetContext implements OffsetContext { +public class ReplicaSetOffsetContext extends CommonOffsetContext { private final MongoDbOffsetContext offsetContext; private final String replicaSetName; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java index e720e27e0..6786bf5b4 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java @@ -14,17 +14,17 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; -import io.debezium.connector.common.BaseSourceInfo; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.common.BaseSourceInfo; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; -import io.debezium.relational.RelationalOffsetContext; +import io.debezium.relational.CommonOffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class MySqlOffsetContext extends RelationalOffsetContext { +public class MySqlOffsetContext extends CommonOffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event"; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index f9de936bc..e6898b43e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -17,13 +17,12 @@ import io.debezium.connector.SnapshotRecord; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; -import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; -import io.debezium.relational.RelationalOffsetContext; +import io.debezium.relational.CommonOffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class OracleOffsetContext extends RelationalOffsetContext { +public class OracleOffsetContext extends CommonOffsetContext { public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx"; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 5994f1ea7..3b948e6d9 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -25,13 +25,13 @@ import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; -import io.debezium.relational.RelationalOffsetContext; +import io.debezium.relational.CommonOffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; import io.debezium.time.Conversions; import io.debezium.util.Clock; -public class PostgresOffsetContext extends RelationalOffsetContext { +public class PostgresOffsetContext extends CommonOffsetContext { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class); public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index 9ea840666..16a19558e 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -8,21 +8,21 @@ import java.time.Instant; import java.util.Map; -import io.debezium.connector.common.BaseSourceInfo; -import io.debezium.relational.RelationalOffsetContext; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.common.BaseSourceInfo; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.CommonOffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; import io.debezium.util.Collect; -public class SqlServerOffsetContext extends RelationalOffsetContext { +public class SqlServerOffsetContext extends CommonOffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalOffsetContext.java b/debezium-core/src/main/java/io/debezium/relational/CommonOffsetContext.java similarity index 94% rename from debezium-core/src/main/java/io/debezium/relational/RelationalOffsetContext.java rename to debezium-core/src/main/java/io/debezium/relational/CommonOffsetContext.java index fd25c5a00..7054d4270 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalOffsetContext.java +++ b/debezium-core/src/main/java/io/debezium/relational/CommonOffsetContext.java @@ -9,7 +9,7 @@ import io.debezium.connector.common.BaseSourceInfo; import io.debezium.pipeline.spi.OffsetContext; -public abstract class RelationalOffsetContext implements OffsetContext { +public abstract class CommonOffsetContext implements OffsetContext { public abstract BaseSourceInfo getSourceInfoObject(); diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index bf6d37814..12fe3d1d9 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -52,7 +52,7 @@ * * @author Gunnar Morling */ -public abstract class RelationalSnapshotChangeEventSource

extends AbstractSnapshotChangeEventSource { +public abstract class RelationalSnapshotChangeEventSource

extends AbstractSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class); @@ -408,13 +408,16 @@ private void setSnapshotMarker(RelationalSnapshotContext snapshotContext) else { snapshotContext.offset.markLastRecordInDataCollection(); } - } else if (snapshotContext.firstRecordInTable) { + } + else if (snapshotContext.firstRecordInTable) { if (snapshotContext.firstTable) { snapshotContext.offset.markFirstSnapshotRecord(); - } else { + } + else { snapshotContext.offset.markFirstRecordInDataCollection(); } - } else { + } + else { snapshotContext.offset.markSnapshotRecord(); } }