DBZ-7260 Further elimination of RS references, mainly from SourceInfo

This commit is contained in:
Jakub Cechacek 2023-12-16 23:05:38 +01:00
parent 15d9b8a791
commit 7ad4701253
7 changed files with 108 additions and 188 deletions

View File

@ -38,29 +38,28 @@ public class MongoDbOffsetContext extends CommonOffsetContext<SourceInfo> {
private final TransactionContext transactionContext;
private final IncrementalSnapshotContext<CollectionId> incrementalSnapshotContext;
private final String replicaSetName;
public MongoDbOffsetContext(MongoDbTaskContext taskContext, SourceInfo sourceInfo, TransactionContext transactionContext,
IncrementalSnapshotContext<CollectionId> incrementalSnapshotContext) {
super(sourceInfo);
this.replicaSetName = ConnectionStrings.replicaSetName(taskContext.getConnectionString());
this.transactionContext = transactionContext;
this.incrementalSnapshotContext = incrementalSnapshotContext;
}
void startReplicaSetSnapshot() {
sourceInfo.startInitialSync(replicaSetName);
void startInitialSync() {
sourceInfo.startInitialSync();
}
void stopReplicaSetSnapshot() {
sourceInfo.stopInitialSync(replicaSetName);
void stopInitialSync() {
sourceInfo.stopInitialSync();
}
@Override
@SuppressWarnings("unchecked")
public Map<String, ?> getOffset() {
Map<String, Object> offsets = (Map<String, Object>) sourceInfo.lastOffset(replicaSetName);
return isSnapshotOngoing() ? offsets : incrementalSnapshotContext.store(transactionContext.store(offsets));
Map<String, Object> offsets = (Map<String, Object>) sourceInfo.lastOffset();
return isSnapshotRunning() ? offsets : incrementalSnapshotContext.store(transactionContext.store(offsets));
}
@Override
@ -74,7 +73,7 @@ public boolean isSnapshotRunning() {
}
public boolean isSnapshotOngoing() {
return sourceInfo.isInitialSyncOngoing(replicaSetName);
return sourceInfo.isSnapshotRunning();
}
@Override
@ -103,45 +102,45 @@ public void event(DataCollectionId collectionId, Instant timestamp) {
}
public void readEvent(CollectionId collectionId, Instant timestamp) {
sourceInfo.collectionEvent(replicaSetName, collectionId, 0L);
sourceInfo.lastOffset(replicaSetName);
sourceInfo.collectionEvent(collectionId, 0L);
sourceInfo.lastOffset();
}
public void initEvent(MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor) {
sourceInfo.initEvent(replicaSetName, cursor);
sourceInfo.initEvent(cursor);
}
public void initFromOpTimeIfNeeded(BsonTimestamp timestamp) {
if (lastResumeToken() != null) {
return;
}
LOGGER.info("Initializing offset for replica-set {} from operation time", replicaSetName);
sourceInfo.noEvent(replicaSetName, timestamp);
LOGGER.info("Initializing offset from operation time");
sourceInfo.noEvent(timestamp);
}
public void noEvent(BufferingChangeStreamCursor.ResumableChangeStreamEvent<BsonDocument> event) {
sourceInfo.noEvent(replicaSetName, event);
sourceInfo.noEvent(event);
}
public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEvent) {
sourceInfo.changeStreamEvent(replicaSetName, changeStreamEvent);
sourceInfo.changeStreamEvent(changeStreamEvent);
}
public String lastResumeToken() {
return sourceInfo.lastResumeToken(replicaSetName);
return sourceInfo.lastResumeToken();
}
public BsonDocument lastResumeTokenDoc() {
final String data = sourceInfo.lastResumeToken(replicaSetName);
final String data = sourceInfo.lastResumeToken();
return (data == null) ? null : ResumeTokens.fromData(data);
}
public BsonTimestamp lastTimestamp() {
return sourceInfo.lastTimestamp(replicaSetName);
return sourceInfo.lastTimestamp();
}
public boolean hasOffset() {
return sourceInfo.hasOffset(replicaSetName);
return sourceInfo.hasOffset();
}
public static class Loader implements OffsetContext.Loader<MongoDbOffsetContext> {
@ -151,14 +150,14 @@ public static class Loader implements OffsetContext.Loader<MongoDbOffsetContext>
private final String replicaSetName;
public Loader(MongoDbTaskContext taskContext) {
this.sourceInfo = new SourceInfo(taskContext.getConnectorConfig());
this.replicaSetName = ConnectionStrings.replicaSetName(taskContext.getConnectionString());
this.sourceInfo = new SourceInfo(taskContext.getConnectorConfig(), replicaSetName);
this.taskContext = taskContext;
}
@Override
public MongoDbOffsetContext load(Map<String, ?> offset) {
sourceInfo.setOffsetFor(replicaSetName, offset);
sourceInfo.setOffset(offset);
return new MongoDbOffsetContext(taskContext, sourceInfo, new TransactionContext(), MongoDbIncrementalSnapshotContext.load(offset, false));
}
}

View File

@ -37,6 +37,7 @@
import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
import io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotContext;
@ -176,7 +177,7 @@ private boolean isSnapshotExpected(MongoDbPartition partition, MongoDbOffsetCont
return true;
}
if (offsetContext.isSnapshotOngoing()) {
if (offsetContext.isSnapshotRunning()) {
// The latest snapshot was not completed, so restart it
LOGGER.info("The previous snapshot was incomplete, so restarting the snapshot");
return true;
@ -217,7 +218,7 @@ private void initSnapshotStartOffsets(MongoDbSnapshotContext snapshotCtx) {
LOGGER.info("Initializing empty Offset context");
snapshotCtx.offset = new MongoDbOffsetContext(
taskContext,
new SourceInfo(connectorConfig),
new SourceInfo(connectorConfig, ConnectionStrings.replicaSetName(taskContext.getConnectionString())),
new TransactionContext(),
new MongoDbIncrementalSnapshotContext<>(false));
}
@ -243,7 +244,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext,
SnapshottingTask snapshottingTask)
throws InterruptedException {
snapshotContext.lastCollection = false;
snapshotContext.offset.startReplicaSetSnapshot();
snapshotContext.offset.startInitialSync();
LOGGER.info("Beginning snapshot at {}", snapshotContext.offset.getOffset());
@ -314,7 +315,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext,
executorService.shutdown();
}
snapshotContext.offset.stopReplicaSetSnapshot();
snapshotContext.offset.stopInitialSync();
}
@Override

View File

@ -217,7 +217,7 @@ protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConf
LOGGER.info("Initializing empty Offset context");
return new MongoDbOffsetContext(
taskContext,
new SourceInfo(connectorConfig),
new SourceInfo(connectorConfig, ConnectionStrings.replicaSetName(taskContext.getConnectionString())),
new TransactionContext(),
new MongoDbIncrementalSnapshotContext<>(false));
}

View File

@ -14,6 +14,7 @@
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureMode;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.spi.topic.TopicNamingStrategy;
@ -39,12 +40,12 @@ public MongoDbTaskContext(Configuration config) {
new MongoDbConnectorConfig(config).getCustomMetricTags(),
Collections::emptySet);
this.connectionContext = new ConnectionContext(config);
this.filters = new Filters(config);
this.connectorConfig = new MongoDbConnectorConfig(config);
this.source = new SourceInfo(connectorConfig);
this.source = new SourceInfo(connectorConfig, ConnectionStrings.replicaSetName(getConnectionString()));
this.topicNamingStrategy = connectorConfig.getTopicNamingStrategy(MongoDbConnectorConfig.TOPIC_NAMING_STRATEGY);
this.serverName = config.getString(CommonConnectorConfig.TOPIC_PREFIX);
this.connectionContext = new ConnectionContext(config);
}
public TopicNamingStrategy<CollectionId> topicNamingStrategy() {
@ -83,7 +84,8 @@ public CaptureMode getCaptureMode() {
}
public ConnectionString getConnectionString() {
return connectorConfig.getTaskConnectionString().orElseThrow();
return connectorConfig.getTaskConnectionString()
.orElse(connectionContext.connectionString());
}
public MongoDbConnection connect(MongoDbConnection.ErrorHandler errorHandler) {

View File

@ -6,12 +6,8 @@
package io.debezium.connector.mongodb;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.connect.data.Struct;
import org.bson.BsonDocument;
@ -36,7 +32,7 @@
* finds a particular record. As the connector uses the Kafka Connect framework to process each record in a partition, Kafka
* Connect keeps track of the most recent offset for that partition.
* <p>
* The {@link #partition(String) source partition} information identifies the particular MongoDB replica set and the connector's
* The {@link #partition() source partition} information identifies the particular MongoDB replica set and the connector's
* logical name of the MongoDB server. A JSON-like representation of the source partition for a database named "customers" hosted
* in a MongoDB replica set named "myMongoServer" is as follows:
*
@ -48,7 +44,7 @@
* </pre>
*
* <p>
* The {@link #lastOffset(String) source offset} information describes the position within a particular partition of each record.
* The {@link #lastOffset() source offset} information describes the position within a particular partition of each record.
* Since each event in MongoDB's oplog is identified by a {@link BSONTimestamp} that tracks the time and the order of the
* event for that particular time (e.g., multiple events that occur at the same time will have unique orders), the offset
* includes the BSONTimetamp representation. (The event's {@code h} field is the unique ID for the operation, so this is also
@ -72,7 +68,6 @@ public final class SourceInfo extends BaseSourceInfo {
private static final String RESUME_TOKEN = "resume_token";
public static final String SERVER_ID_KEY = "server_id";
public static final String REPLICA_SET_NAME = "rs";
public static final String TIMESTAMP = "sec";
@ -85,14 +80,10 @@ public final class SourceInfo extends BaseSourceInfo {
public static final String WALL_TIME = "wallTime";
// Change Stream fields
private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp();
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null, null);
private final ConcurrentMap<String, Map<String, String>> sourcePartitionsByReplicaSetName = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Position> positionsByReplicaSetName = new ConcurrentHashMap<>();
private final Set<String> initialSyncReplicaSets = Collections.newSetFromMap(new ConcurrentHashMap<>());
private Position rPosition = null;
public boolean initialSync = false;
private String replicaSetName;
/**
@ -159,8 +150,9 @@ static final class SessionTransactionId {
}
}
public SourceInfo(MongoDbConnectorConfig connectorConfig) {
public SourceInfo(MongoDbConnectorConfig connectorConfig, String replicaSetName) {
super(connectorConfig);
this.replicaSetName = replicaSetName;
}
CollectionId collectionId() {
@ -171,35 +163,12 @@ Position position() {
return position;
}
/**
* @return server id
*/
public String serverId() {
return serverName();
public String lastResumeToken() {
return position != null ? position.resumeToken : null;
}
/**
* Get the Kafka Connect detail about the source "partition" for the given database in the replica set. If the database is
* not known, this method records the new partition.
*
* @param replicaSetName the name of the replica set name for which the partition is to be obtained; may not be null
* @return the source partition information; never null
*/
public Map<String, String> partition(String replicaSetName) {
if (replicaSetName == null) {
throw new IllegalArgumentException("Replica set name may not be null");
}
return sourcePartitionsByReplicaSetName.computeIfAbsent(replicaSetName, rsName -> Collect.hashMapOf(SERVER_ID_KEY, serverName(), REPLICA_SET_NAME, rsName));
}
public String lastResumeToken(String replicaSetName) {
Position existing = positionsByReplicaSetName.get(replicaSetName);
return existing != null ? existing.resumeToken : null;
}
public BsonTimestamp lastTimestamp(String replicaSetName) {
Position existing = positionsByReplicaSetName.get(replicaSetName);
return existing != null ? existing.getTimestamp() : null;
public BsonTimestamp lastTimestamp() {
return position != null ? position.getTimestamp() : null;
}
/**
@ -207,29 +176,27 @@ public BsonTimestamp lastTimestamp(String replicaSetName) {
* database where we have last read. If the database has not yet been seen, this records the starting position
* for that database. However, if there is a position for the database, the offset representation is returned.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @return a copy of the current offset for the database; never null
*/
public Map<String, ?> lastOffset(String replicaSetName) {
Position existing = positionsByReplicaSetName.get(replicaSetName);
if (existing == null) {
existing = INITIAL_POSITION;
public Map<String, ?> lastOffset() {
if (position == null) {
position = INITIAL_POSITION;
}
if (isInitialSyncOngoing(replicaSetName)) {
if (isSnapshotRunning()) {
Map<String, Object> offset = Collect.hashMapOf(
TIMESTAMP, existing.getTime(),
ORDER, existing.getInc(),
TIMESTAMP, position.getTime(),
ORDER, position.getInc(),
INITIAL_SYNC, true);
return addSessionTxnIdToOffset(existing, offset);
return addSessionTxnIdToOffset(position, offset);
}
Map<String, Object> offset = Collect.hashMapOf(
TIMESTAMP, existing.getTime(),
ORDER, existing.getInc());
existing.getResumeToken().ifPresent(resumeToken -> offset.put(RESUME_TOKEN, resumeToken));
TIMESTAMP, position.getTime(),
ORDER, position.getInc());
position.getResumeToken().ifPresent(resumeToken -> offset.put(RESUME_TOKEN, resumeToken));
return addSessionTxnIdToOffset(existing, offset);
return addSessionTxnIdToOffset(position, offset);
}
private Map<String, ?> addSessionTxnIdToOffset(Position position, Map<String, Object> offset) {
@ -241,71 +208,68 @@ public BsonTimestamp lastTimestamp(String replicaSetName) {
}
/**
* Get a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String)
* Get a {@link Struct} representation of the source {@link #partition() partition} and {@link #lastOffset()
* offset} information where we have last read. The Struct complies with the {@link #schema} for the MongoDB connector.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @param collectionId the event's collection identifier; may not be null
* @return the source partition and offset {@link Struct}; never null
* @see #schema()
*/
public void collectionEvent(String replicaSetName, CollectionId collectionId, long wallTime) {
onEvent(replicaSetName, collectionId, positionsByReplicaSetName.get(replicaSetName), wallTime);
public void collectionEvent(CollectionId collectionId, long wallTime) {
onEvent(collectionId, rPosition, wallTime);
}
public void initEvent(String replicaSetName, MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor) {
public void initEvent(MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor) {
if (cursor == null) {
return;
}
ChangeStreamDocument<BsonDocument> result = cursor.tryNext();
if (result == null) {
noEvent(replicaSetName, cursor);
noEvent(cursor);
}
else {
changeStreamEvent(replicaSetName, result);
changeStreamEvent(result);
}
}
public void noEvent(String replicaSetName, ResumableChangeStreamEvent<BsonDocument> event) {
public void noEvent(ResumableChangeStreamEvent<BsonDocument> event) {
if (event.resumeToken == null || event.hasDocument()) {
return;
}
noEvent(replicaSetName, ResumeTokens.getDataString(event.resumeToken));
noEvent(ResumeTokens.getDataString(event.resumeToken));
}
public void noEvent(String replicaSetName, MongoChangeStreamCursor<?> cursor) {
public void noEvent(MongoChangeStreamCursor<?> cursor) {
if (cursor == null || cursor.getResumeToken() == null) {
return;
}
noEvent(replicaSetName, ResumeTokens.getDataString(cursor.getResumeToken()));
noEvent(ResumeTokens.getDataString(cursor.getResumeToken()));
}
public void noEvent(String replicaSetName, BsonTimestamp timestamp) {
public void noEvent(BsonTimestamp timestamp) {
if (timestamp == null) {
return;
}
Position position = Position.changeStreamPosition(timestamp, null, null);
noEvent(replicaSetName, position);
noEvent(position);
}
private void noEvent(String replicaSetName, String resumeToken) {
private void noEvent(String resumeToken) {
if (resumeToken == null) {
return;
}
Position position = Position.changeStreamPosition(null, resumeToken, null);
noEvent(replicaSetName, position);
noEvent(position);
}
private void noEvent(String replicaSetName, Position position) {
private void noEvent(Position position) {
String namespace = "";
long wallTime = 0L;
positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);
onEvent(CollectionId.parse(replicaSetName, namespace), position, wallTime);
}
public void changeStreamEvent(String replicaSetName, ChangeStreamDocument<BsonDocument> changeStreamEvent) {
public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEvent) {
Position position = INITIAL_POSITION;
String namespace = "";
long wallTime = 0L;
@ -318,13 +282,11 @@ public void changeStreamEvent(String replicaSetName, ChangeStreamDocument<BsonDo
wallTime = changeStreamEvent.getWallTime().getValue();
}
}
positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);
onEvent(CollectionId.parse(replicaSetName, namespace), position, wallTime);
}
private void onEvent(String replicaSetName, CollectionId collectionId, Position position, long wallTime) {
this.replicaSetName = replicaSetName;
private void onEvent(CollectionId collectionId, Position position, long wallTime) {
this.position = (position == null) ? INITIAL_POSITION : position;
this.collectionId = collectionId;
this.wallTime = wallTime;
@ -333,27 +295,22 @@ private void onEvent(String replicaSetName, CollectionId collectionId, Position
/**
* Determine whether we have previously recorded a MongoDB timestamp for the replica set.
*
* @param replicaSetName the name of the replica set name; may not be null
* @return {@code true} if an offset has been recorded for the replica set, or {@code false} if the replica set has not
* yet been seen
*/
public boolean hasOffset(String replicaSetName) {
return positionsByReplicaSetName.containsKey(replicaSetName);
public boolean hasOffset() {
return rPosition != null;
}
/**
* Set the source offset, as read from Kafka Connect, for the given replica set. This method does nothing if the supplied map
* is null.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @param sourceOffset the previously-recorded Kafka Connect source offset; may be null
* @return {@code true} if the offset was recorded, or {@code false} if the source offset is null
* @throws DebeziumException if any offset parameter values are missing, invalid, or of the wrong type
*/
public boolean setOffsetFor(String replicaSetName, Map<String, ?> sourceOffset) {
if (replicaSetName == null) {
throw new IllegalArgumentException("The replica set name may not be null");
}
public boolean setOffset(Map<String, ?> sourceOffset) {
if (sourceOffset == null) {
return false;
}
@ -371,60 +328,40 @@ public boolean setOffsetFor(String replicaSetName, Map<String, ?> sourceOffset)
changeStreamTxnId = new SessionTransactionId(changeStreamLsid, changeStreamTxnNumber);
}
String resumeToken = stringOffsetValue(sourceOffset, RESUME_TOKEN);
Position position = new Position(new BsonTimestamp(time, order), changeStreamTxnId, resumeToken);
positionsByReplicaSetName.put(replicaSetName, position);
rPosition = new Position(new BsonTimestamp(time, order), changeStreamTxnId, resumeToken);
return true;
}
/**
* Set the source offset, as read from Kafka Connect, for the given replica set. This method does nothing if the supplied map
* is null.
*
* @param partition the partition information; may not be null
* @param sourceOffset the previously-recorded Kafka Connect source offset; may be null
* @return {@code true} if the offset was recorded, or {@code false} if the source offset is null
* @throws DebeziumException if any offset parameter values are missing, invalid, or of the wrong type
*/
public boolean setOffsetFor(Map<String, String> partition, Map<String, ?> sourceOffset) {
String replicaSetName = partition.get(REPLICA_SET_NAME);
return setOffsetFor(replicaSetName, sourceOffset);
}
/**
* Record that an initial sync has started for the given replica set.
*
* @param replicaSetName the name of the replica set; never null
*/
public void startInitialSync(String replicaSetName) {
initialSyncReplicaSets.add(replicaSetName);
public void startInitialSync() {
this.initialSync = true;
}
/**
* Record that an initial sync has stopped for the given replica set.
*
* @param replicaSetName the name of the replica set; never null
*/
public void stopInitialSync(String replicaSetName) {
initialSyncReplicaSets.remove(replicaSetName);
public void stopInitialSync() {
this.initialSync = false;
}
/**
* Determine if the initial sync for the given replica set is still ongoing.
*
* @param replicaSetName the name of the replica set; never null
* @return {@code true} if the initial sync for this replica is still ongoing or was not completed before restarting, or
* {@code false} if there is currently no initial sync operation for this replica set
*/
public boolean isInitialSyncOngoing(String replicaSetName) {
return initialSyncReplicaSets.contains(replicaSetName);
public boolean isInitialSyncOngoing() {
return initialSync;
}
/**
* Returns whether any replica sets are still running a snapshot.
*/
public boolean isSnapshotRunning() {
return !initialSyncReplicaSets.isEmpty();
return initialSync;
}
private static int intOffsetValue(Map<String, ?> values, String key) {
@ -482,7 +419,7 @@ protected Instant timestamp() {
@Override
public SnapshotRecord snapshot() {
return isInitialSyncOngoing(replicaSetName) ? SnapshotRecord.TRUE
return isSnapshotRunning() ? SnapshotRecord.TRUE
: snapshotRecord == SnapshotRecord.INCREMENTAL ? SnapshotRecord.INCREMENTAL : SnapshotRecord.FALSE;
}
@ -501,9 +438,6 @@ long wallTime() {
@Override
public String toString() {
return "SourceInfo [sourcePartitionsByReplicaSetName=" + sourcePartitionsByReplicaSetName
+ ", positionsByReplicaSetName=" + positionsByReplicaSetName + ", initialSyncReplicaSets="
+ initialSyncReplicaSets + ", replicaSetName=" + replicaSetName + ", collectionId=" + collectionId
+ ", position=" + position + "]";
return "SourceInfo [initialSyncReplicaSets=" + initialSync + ", collectionId=" + collectionId + ", position=" + position + "]";
}
}

View File

@ -91,24 +91,23 @@ protected void reuseConfiguration(Configuration config) {
*/
private void initialize(boolean restartFromBeginning) {
// Record the partition offsets (if there are some) ...
Map<String, String> partition = null;
Map<String, ?> offsetForPartition = null;
var rsName = ConnectionStrings.replicaSetName(mongo.getConnectionString());
if (!restartFromBeginning && context != null && mongo != null && context.source().hasOffset(rsName)) {
partition = context.source().partition(rsName);
offsetForPartition = context.source().lastOffset(rsName);
if (!restartFromBeginning && context != null && mongo != null && context.source().hasOffset()) {
offsetForPartition = context.source().lastOffset();
}
context = new MongoDbTaskContext(config);
assertThat(context.getConnectionContext().connectionSeed()).isNotEmpty();
// Restore Source position (if there are some) ...
if (partition != null) {
context.source().setOffsetFor(partition, offsetForPartition);
if (offsetForPartition != null) {
context.source().setOffset(offsetForPartition);
}
// Get a connection to the primary ...
connection = context.getConnectionContext().connect(new ConnectionString(mongo.getConnectionString()), context.filters(), TestHelper.connectionErrorHandler(3));
var connectionString = new ConnectionString(TestHelper.connectionString(mongo));
connection = context.getConnectionContext().connect(connectionString, context.filters(), TestHelper.connectionErrorHandler(3));
}
}

View File

@ -63,7 +63,7 @@ private SourceInfo createSourceInfo() {
.with(CommonConnectorConfig.TOPIC_PREFIX, "serverX")
.build());
return new SourceInfo(config);
return new SourceInfo(config, REPLICA_SET_NAME);
}
@SuppressWarnings("unchecked")
@ -107,8 +107,8 @@ public void assertSourceInfoContents(SourceInfo source,
BsonTimestamp timestamp,
String snapshot) {
if (cursor != null) {
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
source.initEvent(REPLICA_SET_NAME, cursor);
assertThat(source.hasOffset()).isEqualTo(false);
source.initEvent(cursor);
}
assertSourceInfoContents(source, cursor != null, resumeTokenData, timestamp, snapshot);
@ -119,16 +119,16 @@ public void assertSourceInfoContents(SourceInfo source,
String resumeTokenData,
BsonTimestamp timestamp,
String snapshot) {
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(hasOffset);
assertThat(source.hasOffset()).isEqualTo(hasOffset);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
Map<String, ?> offset = source.lastOffset();
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo((timestamp != null) ? timestamp.getTime() : 0);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1);
String resumeToken = source.lastResumeToken(REPLICA_SET_NAME);
String resumeToken = source.lastResumeToken();
assertThat(resumeToken).isEqualTo(resumeTokenData);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "test", "names"), 0L);
source.collectionEvent(new CollectionId(REPLICA_SET_NAME, "test", "names"), 0L);
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo((timestamp != null) ? timestamp.getTime() * 1000L : 0L);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1);
@ -145,17 +145,16 @@ public void shouldSetAndReturnRecordedOffset() {
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, null);
// Create a new source info and set the offset ...
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
Map<String, String> partition = source.partition(REPLICA_SET_NAME);
Map<String, ?> offset = source.lastOffset();
SourceInfo newSource = createSourceInfo();
newSource.setOffsetFor(partition, offset);
newSource.setOffset(offset);
assertSourceInfoContents(newSource, true, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, null);
}
@Test
public void shouldReturnOffsetForUnusedReplicaName() {
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
assertThat(source.hasOffset()).isEqualTo(false);
assertSourceInfoContents(source, false, null, new BsonTimestamp(0), null);
}
@ -173,13 +172,13 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameWithoutEvent() {
@Test
public void shouldReturnOffsetForUnusedReplicaNameDuringInitialSync() {
source.startInitialSync(REPLICA_SET_NAME);
source.startInitialSync();
assertSourceInfoContents(source, false, null, new BsonTimestamp(0), "true");
}
@Test
public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() {
source.startInitialSync(REPLICA_SET_NAME);
source.startInitialSync();
var cursor = mockEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, "true");
@ -187,46 +186,32 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() {
@Test
public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSyncWithoutEvent() {
source.startInitialSync(REPLICA_SET_NAME);
source.startInitialSync();
var cursor = mockNoEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, null, "true");
}
@Test
public void shouldProducePartitionMap() {
partition = source.partition(REPLICA_SET_NAME);
assertThat(partition.get(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
assertThat(partition.get(SourceInfo.SERVER_ID_KEY)).isEqualTo("serverX");
assertThat(partition.size()).isEqualTo(2);
}
@Test
public void shouldReturnSamePartitionMapForSameReplicaName() {
partition = source.partition(REPLICA_SET_NAME);
assertThat(partition).isSameAs(source.partition(REPLICA_SET_NAME));
}
@Test
public void versionIsPresent() {
var cursor = mockEventChangeStreamCursor();
source.initEvent(REPLICA_SET_NAME, cursor);
source.initEvent(cursor);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
var cursor = mockEventChangeStreamCursor();
source.initEvent(REPLICA_SET_NAME, cursor);
source.initEvent(cursor);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
@Test
public void wallTimeIsPresent() {
var cursor = mockEventChangeStreamCursor();
source.initEvent(REPLICA_SET_NAME, cursor);
source.initEvent(cursor);
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isNull();
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "test", "names"), 10L);
source.collectionEvent(new CollectionId(REPLICA_SET_NAME, "test", "names"), 10L);
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isEqualTo(10L);
}