DBZ-4951 Remove oplog suppport

This commit is contained in:
Jiri Pechanec 2022-04-08 14:53:45 +02:00
parent ff6d7007aa
commit 707eeab3d8
34 changed files with 125 additions and 1335 deletions

View File

@ -257,19 +257,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
# This matrix creates a cross product across versions and capture modes
# There are certain combinations we want to exclude from the cross product, see exclude below
version-mongo-server: ["3.6", "4.0", "4.4", "5.0"]
capture-mode: ["oplog", "changestreams"]
exclude:
# Do not perform change streams capture mode with server 3.6
- capture-mode: "changestreams"
version-mongo-server: "3.6"
# Do not perform oplog capture mode with server 4.4 and 5.0
- capture-mode: "oplog"
version-mongo-server: "4.4"
- capture-mode: "oplog"
version-mongo-server: "5.0"
version-mongo-server: ["4.0", "4.4", "5.0"]
steps:
- name: Checkout Action
uses: actions/checkout@v2
@ -299,7 +287,6 @@ jobs:
-Dcheckstyle.skip=true
-Dformat.skip=true
-Drevapi.skip
-Dcapture.mode=${{ matrix.capture-mode }}
-Dversion.mongo.server=${{ matrix.version-mongo-server }}
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
-Dmaven.wagon.http.pool=false

View File

@ -94,8 +94,6 @@
-->
<docker.skip>false</docker.skip>
<docker.exposeContainerInfo>docker.container</docker.exposeContainerInfo>
<capture.mode>changestreams</capture.mode>
</properties>
<build>
<plugins>
@ -208,7 +206,6 @@
<connector.mongodb.members.auto.discover>false</connector.mongodb.members.auto.discover>
<connector.mongodb.name>mongo1</connector.mongodb.name>
<skipLongRunningTests>${skipLongRunningTests}</skipLongRunningTests>
<debezium.test.capture.mode>${capture.mode}</debezium.test.capture.mode>
</systemPropertyVariables>
</configuration>
</plugin>

View File

@ -1,56 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.LegacyV1AbstractSourceInfoStructMaker;
import io.debezium.util.SchemaNameAdjuster;
public class LegacyV1MongoDbSourceInfoStructMaker extends LegacyV1AbstractSourceInfoStructMaker<SourceInfo> {
private final Schema schema;
private final String serverName;
public LegacyV1MongoDbSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name(SchemaNameAdjuster.avroAdjuster().adjust("io.debezium.connector.mongo.Source"))
.version(SourceInfo.SCHEMA_VERSION)
.field(SourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.REPLICA_SET_NAME, Schema.STRING_SCHEMA)
.field(SourceInfo.NAMESPACE, Schema.STRING_SCHEMA)
.field(SourceInfo.TIMESTAMP, Schema.INT32_SCHEMA)
.field(SourceInfo.ORDER, Schema.INT32_SCHEMA)
.field(SourceInfo.OPERATION_ID, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.INITIAL_SYNC, SchemaBuilder.bool().optional().defaultValue(false).build())
.build();
this.serverName = connectorConfig.getLogicalName();
}
@Override
public Schema schema() {
return schema;
}
@Override
public Struct struct(SourceInfo sourceInfo) {
final Struct result = super.commonStruct()
.put(SourceInfo.SERVER_NAME_KEY, serverName)
.put(SourceInfo.REPLICA_SET_NAME, sourceInfo.replicaSetName())
.put(SourceInfo.NAMESPACE, sourceInfo.collectionId().namespace())
.put(SourceInfo.TIMESTAMP, sourceInfo.position().getTime())
.put(SourceInfo.ORDER, sourceInfo.position().getInc())
.put(SourceInfo.OPERATION_ID, sourceInfo.position().getOperationId());
if (sourceInfo.isInitialSyncOngoing(sourceInfo.replicaSetName())) {
result.put(SourceInfo.INITIAL_SYNC, true);
}
return result;
}
}

View File

@ -27,7 +27,7 @@
*
* @author Jiri Pechanec
*/
public class MongoDbChangeStreamChangeRecordEmitter extends AbstractChangeRecordEmitter<MongoDbPartition, MongoDbCollectionSchema> {
public class MongoDbChangeRecordEmitter extends AbstractChangeRecordEmitter<MongoDbPartition, MongoDbCollectionSchema> {
private final ChangeStreamDocument<Document> changeStreamEvent;
@ -45,8 +45,8 @@ public class MongoDbChangeStreamChangeRecordEmitter extends AbstractChangeRecord
OPERATION_LITERALS = Collections.unmodifiableMap(literals);
}
public MongoDbChangeStreamChangeRecordEmitter(MongoDbPartition partition, OffsetContext offsetContext, Clock clock,
ChangeStreamDocument<Document> changeStreamEvent) {
public MongoDbChangeRecordEmitter(MongoDbPartition partition, OffsetContext offsetContext, Clock clock,
ChangeStreamDocument<Document> changeStreamEvent) {
super(partition, offsetContext, clock);
this.changeStreamEvent = changeStreamEvent;
}

View File

@ -18,16 +18,13 @@
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoIterable;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureMode;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext.PreviousContext;
import io.debezium.util.Threads;
@ -225,38 +222,7 @@ public Config validate(Map<String, String> connectorConfigs) {
// Try to connect to the database ...
try (ConnectionContext connContext = new ConnectionContext(config)) {
try (MongoClient client = connContext.clientFor(connContext.hosts())) {
final MongoIterable<String> databaseNames = client.listDatabaseNames();
// Can't use 'local' database through mongos
final String databaseName = MongoUtil.contains(databaseNames, ReplicaSetDiscovery.CONFIG_DATABASE_NAME)
? ReplicaSetDiscovery.CONFIG_DATABASE_NAME
: "local";
// Oplog mode is not supported for MongoDB 5+
// The version string format is not guaranteed so defensive measures are in place
final Document versionDocument = client.getDatabase(databaseName)
.runCommand(new Document("buildInfo", 1));
if (versionDocument != null) {
final String versionString = versionDocument.getString("version");
if (versionString != null) {
final String[] versionComponents = versionString.split("\\.");
if (versionComponents.length > 0) {
try {
final int majorVersion = Integer.parseInt(versionComponents[0]);
final ConfigValue captureModeValue = results
.get(MongoDbConnectorConfig.CAPTURE_MODE.name());
final MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config);
final CaptureMode captureMode = connectorConfig.getCaptureMode();
if (majorVersion >= 5
&& captureMode == CaptureMode.OPLOG) {
captureModeValue.addErrorMessage(
"The 'oplog' capture mode is not supported for MongoDB 5 and newer; Please use 'change_streams' or 'change_streams_update_full' instead");
}
}
catch (NumberFormatException e) {
// Ignore the exception
}
}
}
}
client.listDatabaseNames();
}
}
catch (MongoException e) {

View File

@ -123,11 +123,6 @@ public static SnapshotMode parse(String value, String defaultValue) {
*/
public static enum CaptureMode implements EnumeratedValue {
/**
* The classic oplog based capturing.
*/
OPLOG("oplog", false, false),
/**
* Change capture based on MongoDB Change Streams support.
*/
@ -192,10 +187,6 @@ public static CaptureMode parse(String value, String defaultValue) {
return mode;
}
public boolean isChangeStreams() {
return changeStreams;
}
public boolean isFullUpdate() {
return fullUpdate;
}
@ -525,7 +516,6 @@ public boolean isFullUpdate() {
.withImportance(Importance.MEDIUM)
.withDescription("The method used to capture changes from MongoDB server. "
+ "Options include: "
+ "'oplog' to capture changes from the oplog; "
+ "'change_streams' to capture changes via MongoDB Change Streams, update events do not contain full documents; "
+ "'change_streams_update_full' (the default) to capture changes via MongoDB Change Streams, update events contain full documents");
@ -758,12 +748,7 @@ public int getSnapshotMaxThreads() {
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
switch (version) {
case V1:
return new LegacyV1MongoDbSourceInfoStructMaker(Module.name(), Module.version(), this);
default:
return new MongoDbSourceInfoStructMaker(Module.name(), Module.version(), this);
}
return new MongoDbSourceInfoStructMaker(Module.name(), Module.version(), this);
}
public Optional<String> getSnapshotFilterQueryForCollection(CollectionId collectionId) {

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -24,7 +23,6 @@
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureMode;
import io.debezium.connector.mongodb.metrics.MongoDbChangeEventSourceMetricsFactory;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
@ -83,36 +81,6 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
final MongoDbOffsetContext previousOffset = getPreviousOffset(connectorConfig, replicaSets);
final Clock clock = Clock.system();
if (previousOffset != null) {
final List<ReplicaSetOffsetContext> oplogBasedOffsets = new ArrayList<>();
final List<ReplicaSetOffsetContext> changeStreamBasedOffsets = new ArrayList<>();
replicaSets.all().forEach(rs -> {
final ReplicaSetOffsetContext offset = previousOffset.getReplicaSetOffsetContext(rs);
if (rs == null) {
return;
}
if (offset.isFromChangeStream()) {
changeStreamBasedOffsets.add(offset);
}
if (offset.isFromOplog()) {
oplogBasedOffsets.add(offset);
}
});
if (!oplogBasedOffsets.isEmpty() && !changeStreamBasedOffsets.isEmpty()) {
LOGGER.error(
"Replica set offsets are partially from oplog and partially from change streams. This is not supported situation and can lead to unpredicable behaviour.");
}
else if (!oplogBasedOffsets.isEmpty() && taskContext.getCaptureMode().isChangeStreams()) {
LOGGER.info("Stored offsets were created using oplog capturing, trying to switch to change streams.");
}
else if (!changeStreamBasedOffsets.isEmpty() && !taskContext.getCaptureMode().isChangeStreams()) {
LOGGER.warn("Stored offsets were created using change streams capturing. Connector configuration expects oplog capturing.");
LOGGER.warn("Switching configuration to '{}'", CaptureMode.CHANGE_STREAMS_UPDATE_FULL);
LOGGER.warn("Either reconfigure the connector or remove the old offsets");
taskContext.overrideCaptureMode(CaptureMode.CHANGE_STREAMS_UPDATE_FULL);
}
}
PreviousContext previousLogContext = taskContext.configureLoggingContext(taskName);
try {

View File

@ -63,12 +63,6 @@ public String getTransactionId(DataCollectionId source, OffsetContext offset, Ob
if (source == null) {
return null;
}
if (sourceInfo.schema().field(SourceInfo.SESSION_TXN_ID) != null) {
final String sessionTxnId = sourceInfo.getString(SourceInfo.SESSION_TXN_ID);
if (sessionTxnId != null) {
return sessionTxnId;
}
}
// Both components were always present in the testing but the documentation claims they are optional
// so it is better to code this defensively
if (sourceInfo.schema().field(SourceInfo.LSID) != null && (sourceInfo.getString(SourceInfo.LSID) != null
@ -83,10 +77,6 @@ public String getTransactionId(DataCollectionId source, OffsetContext offset, Ob
}
return lsid + ":" + txnNumber;
}
final Long operationId = sourceInfo.getInt64(SourceInfo.OPERATION_ID);
if (operationId == null) {
return null;
}
return Long.toString(operationId);
return null;
}
}

View File

@ -44,7 +44,7 @@ public MongoDbOffsetContext(SourceInfo sourceInfo, TransactionContext transactio
public MongoDbOffsetContext(SourceInfo sourceInfo, TransactionContext transactionContext,
IncrementalSnapshotContext<CollectionId> incrementalSnapshotContext, Map<ReplicaSet, Document> offsets) {
this(sourceInfo, transactionContext, incrementalSnapshotContext);
offsets.forEach((replicaSet, document) -> sourceInfo.opLogEvent(replicaSet.replicaSetName(), document, document, 0));
offsets.forEach((replicaSet, document) -> sourceInfo.initialPosition(replicaSet.replicaSetName(), document));
}
void startReplicaSetSnapshot(String replicaSetName) {

View File

@ -22,9 +22,6 @@ public MongoDbSourceInfoStructMaker(String connector, String version, CommonConn
.field(SourceInfo.REPLICA_SET_NAME, Schema.STRING_SCHEMA)
.field(SourceInfo.COLLECTION, Schema.STRING_SCHEMA)
.field(SourceInfo.ORDER, Schema.INT32_SCHEMA)
.field(SourceInfo.OPERATION_ID, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.TX_ORD, Schema.OPTIONAL_INT64_SCHEMA)
.field(SourceInfo.SESSION_TXN_ID, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.LSID, Schema.OPTIONAL_STRING_SCHEMA)
.field(SourceInfo.TXN_NUMBER, Schema.OPTIONAL_INT64_SCHEMA)
.build();
@ -40,17 +37,13 @@ public Struct struct(SourceInfo sourceInfo) {
Struct struct = super.commonStruct(sourceInfo)
.put(SourceInfo.REPLICA_SET_NAME, sourceInfo.replicaSetName())
.put(SourceInfo.COLLECTION, sourceInfo.collectionId().name())
.put(SourceInfo.ORDER, sourceInfo.position().getInc())
.put(SourceInfo.OPERATION_ID, sourceInfo.position().getOperationId())
.put(SourceInfo.SESSION_TXN_ID, sourceInfo.position().getOplogSessionTxnId());
.put(SourceInfo.ORDER, sourceInfo.position().getInc());
if (sourceInfo.position().getChangeStreamSessionTxnId() != null) {
struct.put(SourceInfo.LSID, sourceInfo.position().getChangeStreamSessionTxnId().lsid)
.put(SourceInfo.TXN_NUMBER, sourceInfo.position().getChangeStreamSessionTxnId().txnNumber);
}
sourceInfo.transactionPosition().ifPresent(transactionPosition -> struct.put(SourceInfo.TX_ORD, transactionPosition));
return struct;
}
}

View File

@ -8,12 +8,9 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -29,10 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.CursorType;
import com.mongodb.ServerAddress;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
@ -41,7 +36,6 @@
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.ErrorHandler;
@ -119,13 +113,8 @@ private void streamChangesForReplicaSet(ChangeEventSourceContext context, MongoD
primaryClient = establishConnectionToPrimary(partition, replicaSet);
if (primaryClient != null) {
final AtomicReference<MongoPrimary> primaryReference = new AtomicReference<>(primaryClient);
primaryClient.execute("read from oplog on '" + replicaSet + "'", primary -> {
if (taskContext.getCaptureMode().isChangeStreams()) {
readChangeStream(primary, primaryReference.get(), replicaSet, context, offsetContext);
}
else {
readOplog(primary, primaryReference.get(), replicaSet, context, offsetContext);
}
primaryClient.execute("read from change stream on '" + replicaSet + "'", primary -> {
readChangeStream(primary, primaryReference.get(), replicaSet, context, offsetContext);
});
}
}
@ -184,92 +173,6 @@ private MongoPrimary establishConnectionToPrimary(MongoDbPartition partition, Re
});
}
private void readOplog(MongoClient primary, MongoPrimary primaryClient, ReplicaSet replicaSet, ChangeEventSourceContext context,
MongoDbOffsetContext offsetContext) {
final ReplicaSetPartition rsPartition = offsetContext.getReplicaSetPartition(replicaSet);
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
final BsonTimestamp oplogStart = rsOffsetContext.lastOffsetTimestamp();
final OptionalLong txOrder = rsOffsetContext.lastOffsetTxOrder();
final ServerAddress primaryAddress = MongoUtil.getPrimaryAddress(primary);
LOGGER.info("Reading oplog for '{}' primary {} starting at {}", replicaSet, primaryAddress, oplogStart);
// Include none of the cluster-internal operations and only those events since the previous timestamp
MongoCollection<Document> oplog = primary.getDatabase("local").getCollection("oplog.rs");
// DBZ-3331 Verify that the start position is in the oplog; throw exception if not.
if (!isStartPositionInOplog(oplogStart, oplog)) {
throw new DebeziumException("Failed to find starting position '" + oplogStart + "' in oplog");
}
ReplicaSetOplogContext oplogContext = new ReplicaSetOplogContext(rsPartition, rsOffsetContext, primaryClient, replicaSet);
Bson filter = null;
if (!txOrder.isPresent()) {
LOGGER.info("The last event processed was not transactional, resuming at the oplog event after '{}'", oplogStart);
filter = Filters.and(Filters.gt("ts", oplogStart), // start just after our last position
Filters.exists("fromMigrate", false)); // skip internal movements across shards
}
else {
LOGGER.info("The last event processed was transactional, resuming at the oplog event '{}', expecting to skip '{}' events",
oplogStart, txOrder.getAsLong());
filter = Filters.and(Filters.gte("ts", oplogStart), Filters.exists("fromMigrate", false));
oplogContext.setIncompleteEventTimestamp(oplogStart);
oplogContext.setIncompleteTxOrder(txOrder.getAsLong());
}
Bson operationFilter = getOplogSkippedOperationsFilter();
if (operationFilter != null) {
filter = Filters.and(filter, operationFilter);
}
FindIterable<Document> results = oplog.find(filter)
.sort(new Document("$natural", 1))
.oplogReplay(true)
.cursorType(CursorType.TailableAwait)
.noCursorTimeout(true);
if (connectorConfig.getCursorMaxAwaitTime() > 0) {
results = results.maxAwaitTime(connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
}
try (MongoCursor<Document> cursor = results.iterator()) {
// In Replicator, this used cursor.hasNext() but this is a blocking call and I observed that this can
// delay the shutdown of the connector by up to 15 seconds or longer. By introducing a Metronome, we
// can respond to the stop request much faster and without much overhead.
Metronome pause = Metronome.sleeper(Duration.ofMillis(500), clock);
while (context.isRunning()) {
// Use tryNext which will return null if no document is yet available from the cursor.
// In this situation if not document is available, we'll pause.
final Document event = cursor.tryNext();
if (event != null) {
if (!handleOplogEvent(primaryAddress, event, event, 0, oplogContext)) {
// Something happened and we are supposed to stop reading
return;
}
try {
dispatcher.dispatchHeartbeatEvent(oplogContext.getPartition(), oplogContext.getOffset());
}
catch (InterruptedException e) {
LOGGER.info("Replicator thread is interrupted");
Thread.currentThread().interrupt();
return;
}
}
else {
try {
pause.pause();
}
catch (InterruptedException e) {
break;
}
}
}
}
}
private List<String> getChangeStreamSkippedOperationsFilter() {
final Set<Operation> skippedOperations = taskContext.getConnectorConfig().getSkippedOperations();
final List<String> includedOperations = new ArrayList<>();
@ -295,9 +198,8 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
final BsonTimestamp oplogStart = rsOffsetContext.lastOffsetTimestamp();
final OptionalLong txOrder = rsOffsetContext.lastOffsetTxOrder();
ReplicaSetOplogContext oplogContext = new ReplicaSetOplogContext(rsPartition, rsOffsetContext, primaryClient, replicaSet);
ReplicaSetChangeStreamsContext oplogContext = new ReplicaSetChangeStreamsContext(rsPartition, rsOffsetContext, primaryClient, replicaSet);
final ServerAddress primaryAddress = MongoUtil.getPrimaryAddress(primary);
LOGGER.info("Reading change stream for '{}' primary {} starting at {}", replicaSet, primaryAddress, oplogStart);
@ -345,7 +247,7 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R
LOGGER.debug("Skipping the event for database '{}' based on database include/exclude list", event.getDatabaseName());
}
else {
oplogContext.getOffset().changeStreamEvent(event, txOrder);
oplogContext.getOffset().changeStreamEvent(event);
oplogContext.getOffset().getOffset();
CollectionId collectionId = new CollectionId(
replicaSet.replicaSetName(),
@ -357,7 +259,7 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R
dispatcher.dispatchDataChangeEvent(
oplogContext.getPartition(),
collectionId,
new MongoDbChangeStreamChangeRecordEmitter(
new MongoDbChangeRecordEmitter(
oplogContext.getPartition(),
oplogContext.getOffset(),
clock,
@ -391,184 +293,6 @@ private void readChangeStream(MongoClient primary, MongoPrimary primaryClient, R
}
}
private boolean isStartPositionInOplog(BsonTimestamp startTime, MongoCollection<Document> oplog) {
final MongoCursor<Document> iterator = oplog.find().iterator();
if (!iterator.hasNext()) {
return false;
}
final BsonTimestamp timestamp = iterator.next().get("ts", BsonTimestamp.class);
if (timestamp == null) {
return false;
}
return timestamp.compareTo(startTime) <= 0;
}
private Bson getOplogSkippedOperationsFilter() {
Set<Operation> skippedOperations = taskContext.getConnectorConfig().getSkippedOperations();
if (skippedOperations.isEmpty()) {
return null;
}
Bson skippedOperationsFilter = null;
for (Operation operation : skippedOperations) {
Bson skippedOperationFilter = Filters.ne("op", operation.code());
if (skippedOperationsFilter == null) {
skippedOperationsFilter = skippedOperationFilter;
}
else {
skippedOperationsFilter = Filters.or(skippedOperationsFilter, skippedOperationFilter);
}
}
return skippedOperationsFilter;
}
private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder,
ReplicaSetOplogContext oplogContext) {
String ns = event.getString("ns");
Document object = event.get(OBJECT_FIELD, Document.class);
if (Objects.isNull(object)) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Missing 'o' field in event, so skipping {}", event.toJson());
}
return true;
}
if (Objects.isNull(ns) || ns.isEmpty()) {
// These are considered replica set events
String msg = object.getString("msg");
if ("new primary".equals(msg)) {
AtomicReference<ServerAddress> address = new AtomicReference<>();
try {
oplogContext.getPrimary().executeBlocking("conn", mongoClient -> {
ServerAddress currentPrimary = MongoUtil.getPrimaryAddress(mongoClient);
address.set(currentPrimary);
});
}
catch (InterruptedException e) {
LOGGER.error("Get current primary executeBlocking", e);
}
ServerAddress serverAddress = address.get();
if (Objects.nonNull(serverAddress) && !serverAddress.equals(primaryAddress)) {
LOGGER.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}",
primaryAddress, serverAddress);
}
else {
LOGGER.info("Found new primary event in oplog, current {} is new primary. " +
"Continue to process oplog event.", primaryAddress);
}
dispatcher.dispatchConnectorEvent(oplogContext.getPartition(), new PrimaryElectionEvent(serverAddress));
}
// Otherwise ignore
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Skipping event with no namespace: {}", event.toJson());
}
return true;
}
final List<Document> txChanges = transactionChanges(event);
if (!txChanges.isEmpty()) {
if (Objects.nonNull(oplogContext.getIncompleteEventTimestamp())) {
if (oplogContext.getIncompleteEventTimestamp().equals(SourceInfo.extractEventTimestamp(event))) {
for (Document change : txChanges) {
txOrder++;
if (txOrder <= oplogContext.getIncompleteTxOrder()) {
LOGGER.debug("Skipping record as it is expected to be already processed: {}", change);
continue;
}
final boolean r = handleOplogEvent(primaryAddress, change, event, txOrder, oplogContext);
if (!r) {
return false;
}
}
}
oplogContext.setIncompleteEventTimestamp(null);
return true;
}
try {
dispatcher.dispatchTransactionStartedEvent(oplogContext.getPartition(), getTransactionId(event), oplogContext.getOffset());
for (Document change : txChanges) {
final boolean r = handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext);
if (!r) {
return false;
}
}
dispatcher.dispatchTransactionCommittedEvent(oplogContext.getPartition(), oplogContext.getOffset());
}
catch (InterruptedException e) {
LOGGER.error("Streaming transaction changes for replica set '{}' was interrupted", oplogContext.getReplicaSetName());
throw new ConnectException("Streaming of transaction changes was interrupted for replica set " + oplogContext.getReplicaSetName(), e);
}
return true;
}
final String operation = event.getString(OPERATION_FIELD);
if (!MongoDbChangeSnapshotOplogRecordEmitter.isValidOperation(operation)) {
LOGGER.debug("Skipping event with \"op={}\"", operation);
return true;
}
int delimIndex = ns.indexOf('.');
if (delimIndex > 0) {
assert (delimIndex + 1) < ns.length();
final String dbName = ns.substring(0, delimIndex);
final String collectionName = ns.substring(delimIndex + 1);
if ("$cmd".equals(collectionName)) {
// This is a command on the database
// TODO: Probably want to handle some of these when we track creation/removal of collections
LOGGER.debug("Skipping database command event: {}", event.toJson());
return true;
}
// Otherwise it is an event on a document in a collection
if (!taskContext.filters().databaseFilter().test(dbName)) {
LOGGER.debug("Skipping the event for database '{}' based on database include/exclude list", dbName);
return true;
}
oplogContext.getOffset().oplogEvent(event, masterEvent, txOrder);
oplogContext.getOffset().getOffset();
CollectionId collectionId = new CollectionId(oplogContext.getReplicaSetName(), dbName, collectionName);
if (taskContext.filters().collectionFilter().test(collectionId)) {
try {
return dispatcher.dispatchDataChangeEvent(
oplogContext.getPartition(),
collectionId,
new MongoDbChangeSnapshotOplogRecordEmitter(
oplogContext.getPartition(),
oplogContext.getOffset(),
clock,
event,
false));
}
catch (Exception e) {
errorHandler.setProducerThrowable(e);
return false;
}
}
}
return true;
}
private List<Document> transactionChanges(Document event) {
final String op = event.getString(OPERATION_FIELD);
final Document o = event.get(OBJECT_FIELD, Document.class);
if (!(OPERATION_CONTROL.equals(op) && Objects.nonNull(o) && o.containsKey(TX_OPS))) {
return Collections.emptyList();
}
return o.get(TX_OPS, List.class);
}
protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connectorConfig, MongoDbPartition partition,
ReplicaSets replicaSets) {
final Map<ReplicaSet, Document> positions = new LinkedHashMap<>();
@ -594,29 +318,17 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
new MongoDbIncrementalSnapshotContext<>(false), positions);
}
private static String getTransactionId(Document event) {
final Long operationId = event.getLong(SourceInfo.OPERATION_ID);
if (operationId != null && operationId != 0L) {
return Long.toString(operationId);
}
return MongoUtil.getOplogSessionTransactionId(event);
}
/**
* A context associated with a given replica set oplog read operation.
*/
private class ReplicaSetOplogContext {
private class ReplicaSetChangeStreamsContext {
private final ReplicaSetPartition partition;
private final ReplicaSetOffsetContext offset;
private final MongoPrimary primary;
private final ReplicaSet replicaSet;
private BsonTimestamp incompleteEventTimestamp;
private long incompleteTxOrder = 0;
ReplicaSetOplogContext(ReplicaSetPartition partition, ReplicaSetOffsetContext offsetContext,
MongoPrimary primary, ReplicaSet replicaSet) {
ReplicaSetChangeStreamsContext(ReplicaSetPartition partition, ReplicaSetOffsetContext offsetContext,
MongoPrimary primary, ReplicaSet replicaSet) {
this.partition = partition;
this.offset = offsetContext;
this.primary = primary;
@ -638,21 +350,5 @@ MongoPrimary getPrimary() {
String getReplicaSetName() {
return replicaSet.replicaSetName();
}
BsonTimestamp getIncompleteEventTimestamp() {
return incompleteEventTimestamp;
}
public void setIncompleteEventTimestamp(BsonTimestamp incompleteEventTimestamp) {
this.incompleteEventTimestamp = incompleteEventTimestamp;
}
public long getIncompleteTxOrder() {
return incompleteTxOrder;
}
public void setIncompleteTxOrder(long incompleteTxOrder) {
this.incompleteTxOrder = incompleteTxOrder;
}
}
}

View File

@ -23,7 +23,6 @@ public class MongoDbTaskContext extends CdcSourceTaskContext {
private final String serverName;
private final ConnectionContext connectionContext;
private final MongoDbConnectorConfig connectorConfig;
private CaptureMode captureMode;
/**
* @param config the configuration
@ -38,7 +37,6 @@ public MongoDbTaskContext(Configuration config) {
this.topicSelector = MongoDbTopicSelector.defaultSelector(serverName, connectorConfig.getHeartbeatTopicsPrefix());
this.serverName = config.getString(MongoDbConnectorConfig.LOGICAL_NAME);
this.connectionContext = new ConnectionContext(config);
this.overrideCaptureMode(connectorConfig.getCaptureMode());
}
public TopicSelector<CollectionId> topicSelector() {
@ -73,10 +71,6 @@ public MongoDbConnectorConfig getConnectorConfig() {
* @return effectively used capture mode
*/
public CaptureMode getCaptureMode() {
return captureMode;
}
public void overrideCaptureMode(CaptureMode captureModeUsed) {
this.captureMode = captureModeUsed;
return connectorConfig.getCaptureMode();
}
}

View File

@ -7,7 +7,6 @@
import java.time.Instant;
import java.util.Map;
import java.util.OptionalLong;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -118,36 +117,18 @@ public void readEvent(CollectionId collectionId, Instant timestamp) {
sourceInfo.lastOffset(replicaSetName);
}
public void oplogEvent(Document oplogEvent, Document masterEvent, Long txOrder) {
sourceInfo.opLogEvent(replicaSetName, oplogEvent, masterEvent, txOrder);
}
public void changeStreamEvent(ChangeStreamDocument<Document> changeStreamEvent, OptionalLong txOrder) {
sourceInfo.changeStreamEvent(replicaSetName, changeStreamEvent, txOrder.orElse(0));
public void changeStreamEvent(ChangeStreamDocument<Document> changeStreamEvent) {
sourceInfo.changeStreamEvent(replicaSetName, changeStreamEvent);
}
public BsonTimestamp lastOffsetTimestamp() {
return sourceInfo.lastOffsetTimestamp(replicaSetName);
}
public OptionalLong lastOffsetTxOrder() {
return sourceInfo.lastOffsetTxOrder(replicaSetName);
}
public String lastResumeToken() {
return sourceInfo.lastResumeToken(replicaSetName);
}
public boolean isFromOplog() {
return sourceInfo != null && sourceInfo.lastPosition(replicaSetName) != null
&& sourceInfo.lastPosition(replicaSetName).getOperationId() != null
&& sourceInfo.lastResumeToken(replicaSetName) == null;
}
public boolean isFromChangeStream() {
return sourceInfo != null && sourceInfo.lastResumeToken(replicaSetName) != null;
}
@Override
public void incrementalSnapshotEvents() {
offsetContext.incrementalSnapshotEvents();

View File

@ -9,7 +9,6 @@
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -78,9 +77,6 @@ public final class SourceInfo extends BaseSourceInfo {
public static final String NAMESPACE = "ns";
public static final String TIMESTAMP = "sec";
public static final String ORDER = "ord";
public static final String OPERATION_ID = "h";
public static final String TX_ORD = "tord";
public static final String SESSION_TXN_ID = "stxnid";
public static final String INITIAL_SYNC = "initsync";
public static final String COLLECTION = "collection";
public static final String LSID = "lsid";
@ -89,7 +85,7 @@ public final class SourceInfo extends BaseSourceInfo {
// Change Stream fields
private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp();
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null, 0, null, null, null);
private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null, null);
private final ConcurrentMap<String, Map<String, String>> sourcePartitionsByReplicaSetName = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Position> positionsByReplicaSetName = new ConcurrentHashMap<>();
@ -102,40 +98,26 @@ public final class SourceInfo extends BaseSourceInfo {
* after which the recorded offset may be retrieved but not the source struct.
*/
private CollectionId collectionId;
private Position position;
private Position position = new Position(INITIAL_TIMESTAMP, null, null);
@Immutable
protected static final class Position {
private final Long opId;
private final BsonTimestamp ts;
private final long txOrder;
private final String oplogSessionTxnId;
private final SessionTransactionId changeStreamSessionTxnId;
private final String resumeToken;
public Position(int ts, int order, Long opId, long txOrder, String oplogSessionTxnId,
SessionTransactionId changeStreamsSessionTxnId, String resumeToken) {
this(new BsonTimestamp(ts, order), opId, txOrder, oplogSessionTxnId, changeStreamsSessionTxnId,
resumeToken);
}
private Position(BsonTimestamp ts, Long opId, long txOrder, String oplogSessionTxnId,
SessionTransactionId changeStreamSessionTxnId, String resumeToken) {
public Position(BsonTimestamp ts, SessionTransactionId changeStreamsSessionTxnId, String resumeToken) {
this.ts = ts;
this.opId = opId;
this.txOrder = txOrder;
this.oplogSessionTxnId = oplogSessionTxnId;
this.changeStreamSessionTxnId = changeStreamSessionTxnId;
this.changeStreamSessionTxnId = changeStreamsSessionTxnId;
this.resumeToken = resumeToken;
assert this.ts != null;
}
public static Position oplogPosition(BsonTimestamp ts, Long opId, long txOrder, String sessionTxnId) {
return new Position(ts, opId, txOrder, sessionTxnId, null, null);
public static Position snapshotPosition(BsonTimestamp ts) {
return new Position(ts, null, null);
}
public static Position changeStreamPosition(BsonTimestamp ts, String resumeToken, SessionTransactionId sessionTxnId) {
return new Position(ts, null, 0, null, sessionTxnId, resumeToken);
return new Position(ts, sessionTxnId, resumeToken);
}
public BsonTimestamp getTimestamp() {
@ -150,25 +132,19 @@ public int getInc() {
return this.ts.getInc();
}
public Long getOperationId() {
return this.opId;
}
public String getOplogSessionTxnId() {
return oplogSessionTxnId;
}
public SessionTransactionId getChangeStreamSessionTxnId() {
return changeStreamSessionTxnId;
}
public OptionalLong getTxOrder() {
return txOrder == 0 ? OptionalLong.empty() : OptionalLong.of(txOrder);
}
public Optional<String> getResumeToken() {
return Optional.ofNullable(resumeToken);
}
@Override
public String toString() {
return "Position [ts=" + ts + ", changeStreamSessionTxnId=" + changeStreamSessionTxnId + ", resumeToken="
+ resumeToken + "]";
}
}
static final class SessionTransactionId {
@ -240,17 +216,6 @@ public BsonTimestamp lastOffsetTimestamp(String replicaSetName) {
return existing != null ? existing.ts : INITIAL_TIMESTAMP;
}
/**
* Get the MongoDB transaction order of the last offset position for the replica set.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @return the tx order of the transaction in progress or 0 in case of non-transactional event
*/
public OptionalLong lastOffsetTxOrder(String replicaSetName) {
Position existing = positionsByReplicaSetName.get(replicaSetName);
return existing != null ? existing.getTxOrder() : OptionalLong.empty();
}
public String lastResumeToken(String replicaSetName) {
Position existing = positionsByReplicaSetName.get(replicaSetName);
return existing != null ? existing.resumeToken : null;
@ -276,23 +241,17 @@ public Position lastPosition(String replicaSetName) {
if (isInitialSyncOngoing(replicaSetName)) {
return addSessionTxnIdToOffset(existing, Collect.hashMapOf(TIMESTAMP, Integer.valueOf(existing.getTime()),
ORDER, Integer.valueOf(existing.getInc()),
OPERATION_ID, existing.getOperationId(),
INITIAL_SYNC, true));
}
Map<String, Object> offset = Collect.hashMapOf(TIMESTAMP, Integer.valueOf(existing.getTime()),
ORDER, Integer.valueOf(existing.getInc()),
OPERATION_ID, existing.getOperationId());
ORDER, Integer.valueOf(existing.getInc()));
existing.getTxOrder().ifPresent(txOrder -> offset.put(TX_ORD, txOrder));
existing.getResumeToken().ifPresent(resumeToken -> offset.put(RESUME_TOKEN, resumeToken));
return addSessionTxnIdToOffset(existing, offset);
}
private Map<String, ?> addSessionTxnIdToOffset(Position position, Map<String, Object> offset) {
if (position.getOplogSessionTxnId() != null) {
offset.put(SESSION_TXN_ID, position.getOplogSessionTxnId());
}
if (position.getChangeStreamSessionTxnId() != null) {
offset.put(LSID, position.getChangeStreamSessionTxnId().lsid);
offset.put(TXN_NUMBER, position.getChangeStreamSessionTxnId().txnNumber);
@ -314,24 +273,21 @@ public void collectionEvent(String replicaSetName, CollectionId collectionId) {
}
/**
* Get a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String)
* Initializes a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String)
* offset} information. The Struct complies with the {@link #schema} for the MongoDB connector.
* The method usually sets the position in the oplog after which the capturing should start.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @param oplogEvent the replica set oplog event that was last read; may be null if the position is the start of
* the oplog
* @param masterEvent the replica set oplog event that contains event metadata; same as oplogEvent for non-transactional changes
* @param orderInTx order in transaction batch, 0 for non-transactional events
* @see #schema()
*/
public void opLogEvent(String replicaSetName, Document oplogEvent, Document masterEvent, long orderInTx) {
public void initialPosition(String replicaSetName, Document oplogEvent) {
Position position = INITIAL_POSITION;
String namespace = "";
if (oplogEvent != null) {
BsonTimestamp ts = extractEventTimestamp(masterEvent);
Long opId = masterEvent.getLong("h");
String sessionTxnId = extractSessionTxnId(masterEvent);
position = Position.oplogPosition(ts, opId, orderInTx, sessionTxnId);
BsonTimestamp ts = extractEventTimestamp(oplogEvent);
position = Position.snapshotPosition(ts);
namespace = oplogEvent.getString("ns");
}
positionsByReplicaSetName.put(replicaSetName, position);
@ -339,7 +295,7 @@ public void opLogEvent(String replicaSetName, Document oplogEvent, Document mast
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position);
}
public void changeStreamEvent(String replicaSetName, ChangeStreamDocument<Document> changeStreamEvent, long orderInTx) {
public void changeStreamEvent(String replicaSetName, ChangeStreamDocument<Document> changeStreamEvent) {
Position position = INITIAL_POSITION;
String namespace = "";
if (changeStreamEvent != null) {
@ -353,19 +309,6 @@ public void changeStreamEvent(String replicaSetName, ChangeStreamDocument<Docume
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position);
}
/**
* Get a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String)
* offset} information. The Struct complies with the {@link #schema} for the MongoDB connector.
*
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @param oplogEvent the replica set oplog event that was last read; may be null if the position is the start of
* the oplog
* @see #schema()
*/
public void opLogEvent(String replicaSetName, Document oplogEvent) {
opLogEvent(replicaSetName, oplogEvent, oplogEvent, 0);
}
/**
* Utility to extract the {@link BsonTimestamp timestamp} value from the event.
*
@ -376,27 +319,6 @@ protected static BsonTimestamp extractEventTimestamp(Document oplogEvent) {
return oplogEvent != null ? oplogEvent.get("ts", BsonTimestamp.class) : null;
}
/**
* Utility to extract the {@link String unique transaction id} value from the event.
*
* @param oplogEvent the event
* @return the session transaction id or null
*/
protected static String extractSessionTxnId(Document oplogEvent) {
// In MongoDB prior to 4.2, the h field is populated.
// For backward compatibility if h is not present or contains a zero value, then proeeed to extract
// the session transaction unique identifier value.
Long opId = oplogEvent.getLong("h");
if (opId == null || opId == 0L) {
// For MongoDB 4.2+, the h field no longer has a non-zero value.
// In this case, the lsid and the associated txnNumber fields must be extracted and combined to
// represent a unique identifier for the individual operation. Therefore, the return value will
// carry the same semantics as h did for MongoDB platforms prior to 4.2.
return MongoUtil.getOplogSessionTransactionId(oplogEvent);
}
return null;
}
private void onEvent(String replicaSetName, CollectionId collectionId, Position position) {
this.replicaSetName = replicaSetName;
this.position = (position == null) ? INITIAL_POSITION : position;
@ -437,9 +359,6 @@ public boolean setOffsetFor(String replicaSetName, Map<String, ?> sourceOffset)
}
int time = intOffsetValue(sourceOffset, TIMESTAMP);
int order = intOffsetValue(sourceOffset, ORDER);
long operationId = longOffsetValue(sourceOffset, OPERATION_ID);
long txOrder = longOffsetValue(sourceOffset, TX_ORD);
String oplogSessionTxnId = stringOffsetValue(sourceOffset, SESSION_TXN_ID);
String changeStreamLsid = stringOffsetValue(sourceOffset, LSID);
Long changeStreamTxnNumber = longOffsetValue(sourceOffset, TXN_NUMBER);
SessionTransactionId changeStreamTxnId = null;
@ -448,7 +367,7 @@ public boolean setOffsetFor(String replicaSetName, Map<String, ?> sourceOffset)
}
String resumeToken = stringOffsetValue(sourceOffset, RESUME_TOKEN);
positionsByReplicaSetName.put(replicaSetName,
new Position(time, order, operationId, txOrder, oplogSessionTxnId, changeStreamTxnId, resumeToken));
new Position(new BsonTimestamp(time, order), changeStreamTxnId, resumeToken));
return true;
}
@ -570,7 +489,11 @@ String replicaSetName() {
return replicaSetName;
}
protected OptionalLong transactionPosition() {
return position.getTxOrder();
@Override
public String toString() {
return "SourceInfo [sourcePartitionsByReplicaSetName=" + sourcePartitionsByReplicaSetName
+ ", positionsByReplicaSetName=" + positionsByReplicaSetName + ", initialSyncReplicaSets="
+ initialSyncReplicaSets + ", replicaSetName=" + replicaSetName + ", collectionId=" + collectionId
+ ", position=" + position + "]";
}
}

View File

@ -24,6 +24,7 @@ public MongoDbCloudEventsMaker(RecordParser parser, SerializerType contentType,
@Override
public String ceId() {
return "name:" + recordParser.getMetadata(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";h:" + recordParser.getMetadata(MongoDbRecordParser.OPERATION_ID);
+ ";ts:" + recordParser.getMetadata(AbstractSourceInfo.TIMESTAMP_KEY)
+ ";ord:" + recordParser.getMetadata(MongoDbRecordParser.ORDER);
}
}

View File

@ -25,13 +25,11 @@ public class MongoDbRecordParser extends RecordParser {
static final String REPLICA_SET_NAME = "rs";
static final String ORDER = "ord";
static final String OPERATION_ID = "h";
static final String COLLECTION = "collection";
static final Set<String> MONGODB_SOURCE_FIELD = Collect.unmodifiableSet(
REPLICA_SET_NAME,
ORDER,
OPERATION_ID,
COLLECTION);
public MongoDbRecordParser(Schema schema, Struct record) {

View File

@ -72,20 +72,11 @@ public void testCorrectFormat() throws Exception {
}
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(deleteRecord, false);
if (TestHelper.isOplogCaptureMode()) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(deleteRecord, "filter", false);
}
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(deleteRecord, "mongodb", "mongo1", false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJson(updateRecord, false);
if (TestHelper.isOplogCaptureMode()) {
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, "filter", false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, "patch", false);
}
else {
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, MongoDbFieldName.UPDATE_DESCRIPTION, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, Envelope.FieldName.AFTER, false);
}
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, MongoDbFieldName.UPDATE_DESCRIPTION, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(updateRecord, Envelope.FieldName.AFTER, false);
CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(updateRecord, "mongodb", "mongo1", false);
stopConnector();

View File

@ -1618,19 +1618,11 @@ private void assertUpdateRecord(String blackList, ObjectId objectId, Document sn
SourceRecord record = updateRecords.allRecordsInOrder().get(0);
Struct value = getValue(record);
if (TestHelper.isOplogCaptureMode()) {
Document expectedDoc = TestHelper
.getDocumentWithoutLanguageVersion(expected.patch);
Document actualDoc = TestHelper.getDocumentWithoutLanguageVersion(value.getString(field));
assertThat(actualDoc).isEqualTo(expectedDoc);
}
else {
TestHelper.assertChangeStreamUpdateAsDocs(objectId, value, expected.full, expected.removedFields,
expected.updatedFields);
}
TestHelper.assertChangeStreamUpdateAsDocs(objectId, value, expected.full, expected.removedFields,
expected.updatedFields);
}
private String updateField() {
return TestHelper.isOplogCaptureMode() ? PATCH : AFTER;
return AFTER;
}
}

View File

@ -1602,18 +1602,11 @@ private void assertUpdateRecord(String blackList, ObjectId objectId, Document sn
SourceRecord record = updateRecords.allRecordsInOrder().get(0);
Struct value = getValue(record);
if (TestHelper.isOplogCaptureMode()) {
Document expectedDoc = TestHelper.getDocumentWithoutLanguageVersion(expected.patch);
Document actualDoc = TestHelper.getDocumentWithoutLanguageVersion(value.getString(field));
assertThat(actualDoc).isEqualTo(expectedDoc);
}
else {
TestHelper.assertChangeStreamUpdateAsDocs(objectId, value, expected.full, expected.removedFields,
expected.updatedFields);
}
TestHelper.assertChangeStreamUpdateAsDocs(objectId, value, expected.full, expected.removedFields,
expected.updatedFields);
}
private String updateField() {
return TestHelper.isOplogCaptureMode() ? PATCH : AFTER;
return AFTER;
}
}

View File

@ -137,13 +137,8 @@ public void shouldNotRenameNestedMissingFieldsForUpdateEventWithEmbeddedDocument
SourceRecord record = getUpdateRecord("*.c1.address.missing:new_missing", obj, updateObj);
Struct value = (Struct) record.value();
if (TestHelper.isOplogCaptureMode()) {
assertThat(getDocumentFromUpdateRecord(value)).isEqualTo(updateObj);
}
else {
final Document fullObj = ((Document) updateObj.get("$set")).append(ID, objId);
assertThat(getDocumentFromUpdateRecord(value)).isEqualTo(fullObj);
}
final Document fullObj = ((Document) updateObj.get("$set")).append(ID, objId);
assertThat(getDocumentFromUpdateRecord(value)).isEqualTo(fullObj);
}
@Test
@ -1155,8 +1150,7 @@ public void shouldNotRenameNestedFieldsToExistingNamesForSetNestedFieldUpdateEve
.append("addresses.0.street", "Claude Debussylaan")
.append("addresses.0.city", "Amsterdam");
assertShouldNotRenameDuringUpdate("*.c1.addresses.street:city", obj, updateObj, false,
TestHelper.isOplogCaptureMode() ? "addresses.0.city" : "city");
assertShouldNotRenameDuringUpdate("*.c1.addresses.street:city", obj, updateObj, false, "city");
}
@Test
@ -1466,12 +1460,6 @@ public void shouldNotRenameNestedFieldsToExistingNamesForUnsetNestedFieldUpdateE
.append("addresses.0.number", "")
.append("addresses.0.street", "")
.append("addresses.0.city", "");
if (TestHelper.isOplogCaptureMode()) {
// Change Stream does not send unset fields in both full and change document
// so the error would not be thrown
assertShouldNotRenameDuringUpdate("*.c1.addresses.street:city", obj, updateObj, true, "addresses.0.city");
}
}
@Test
@ -1680,13 +1668,13 @@ private static Document getFilterFromId(ObjectId id) {
private static Document getDocumentFromUpdateRecord(Struct value) {
assertThat(value).isNotNull();
final String patch = value.getString(TestHelper.isOplogCaptureMode() ? PATCH : AFTER);
assertThat(patch).isNotNull();
final String after = value.getString(AFTER);
assertThat(after).isNotNull();
// By parsing the patch string, we can remove the $v internal key added by the driver that specifies the
// language version used to manipulate the document. The goal by removing this key is that the original
// document used to update the database entry can be compared directly.
Document parsed = Document.parse(patch);
Document parsed = Document.parse(after);
parsed.remove("$v");
return parsed;
}
@ -1841,15 +1829,7 @@ private void assertShouldNotRenameDuringUpdate(String renamesList, Document snap
private void assertUpdateRecord(ObjectId objectId, SourceRecord record, ExpectedUpdate expected) throws InterruptedException {
Struct value = (Struct) record.value();
if (TestHelper.isOplogCaptureMode()) {
final Document expectedDoc = TestHelper
.getDocumentWithoutLanguageVersion(expected.patch);
final Document actualDoc = TestHelper.getDocumentWithoutLanguageVersion(value.getString(PATCH));
assertThat(actualDoc).isEqualTo(expectedDoc);
}
else {
TestHelper.assertChangeStreamUpdateAsDocs(objectId, value, expected.full, expected.removedFields,
expected.updatedFields);
}
TestHelper.assertChangeStreamUpdateAsDocs(objectId, value, expected.full, expected.removedFields,
expected.updatedFields);
}
}

View File

@ -28,9 +28,7 @@
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
@ -58,9 +56,6 @@ public class IncrementalSnapshotIT extends AbstractMongoConnectorIT {
protected static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-is.txt")
.toAbsolutePath();
@Rule
public TestRule skipForOplog = new SkipForOplogTestRule();
@Before
public void before() {
// Set up the replication context for connections ...

View File

@ -1,109 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.fest.assertions.Assertions.assertThat;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureMode;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.SnapshotMode;
import io.debezium.junit.logging.LogInterceptor;
/**
* Integration test that verifies that if the capture mode is reconfigured it stays the same
* as the one stored in offsets.
*
* @author Jiri Pechanec
*/
public class KeepCaptureModeAfterRestartIT extends AbstractMongoConnectorIT {
@Rule
public final TestRule rule = new SkipForOplogTestRule();
@Test
public void changeStreamsToOplog() throws Exception {
// Testing.Print.enable();
final LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
testSwitch(CaptureMode.CHANGE_STREAMS_UPDATE_FULL, CaptureMode.OPLOG);
stopConnector(value -> assertThat(
logInterceptor.containsWarnMessage("Stored offsets were created using change streams capturing.")
&& logInterceptor.containsWarnMessage("Switching configuration to 'CHANGE_STREAMS_UPDATE_FULL'")
&& logInterceptor.containsWarnMessage("Either reconfigure the connector or remove the old offsets"))
.isTrue());
}
@Test
public void oplogToChangeStreams() throws Exception {
// Testing.Print.enable();
final LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
testSwitch(CaptureMode.OPLOG, CaptureMode.CHANGE_STREAMS);
stopConnector(value -> assertThat(logInterceptor.containsMessage(
"Stored offsets were created using oplog capturing, trying to switch to change streams.")).isTrue());
}
public void testSwitch(CaptureMode from, CaptureMode to) throws InterruptedException {
config = TestHelper.getConfiguration()
.edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(MongoDbConnectorConfig.CAPTURE_MODE, from)
.build();
context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(primary(), "dbA");
start(MongoDbConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
waitForSnapshotToBeCompleted("mongodb", "mongo1");
List<Document> documentsToInsert = loadTestDocuments("restaurants1.json");
insertDocumentsInTx("dbA", "c1", documentsToInsert.toArray(new Document[0]));
final SourceRecords records = consumeRecordsByTopic(6);
final List<SourceRecord> c1s = records.recordsForTopic("mongo1.dbA.c1");
assertThat(c1s).hasSize(6);
stopConnector();
List<Document> documentsToInsert2 = loadTestDocuments("restaurants6.json");
insertDocuments("dbA", "c1", documentsToInsert2.toArray(new Document[0]));
config = TestHelper.getConfiguration()
.edit()
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(MongoDbConnectorConfig.CAPTURE_MODE, to)
.build();
start(MongoDbConnector.class, config);
assertConnectorIsRunning();
final SourceRecords records2 = consumeRecordsByTopic(1);
final List<SourceRecord> c1s2 = records2.recordsForTopic("mongo1.dbA.c1");
assertThat(c1s2).hasSize(1);
final BsonDocument first = BsonDocument.parse(((Struct) c1s2.get(0).value()).getString("after"));
assertThat(first.getString("restaurant_id").getValue()).isEqualTo("80364347");
}
}

View File

@ -1,264 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.fest.assertions.Assertions.assertThat;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig.Version;
import io.debezium.config.Configuration;
/**
* @author Randall Hauch
*
*/
public class LegacyV1SourceInfoTest {
private static String REPLICA_SET_NAME = "myReplicaSet";
private SourceInfo source;
private Map<String, String> partition;
@Before
public void beforeEach() {
source = new SourceInfo(new MongoDbConnectorConfig(
Configuration.create()
.with(MongoDbConnectorConfig.LOGICAL_NAME, "serverX")
.with(MongoDbConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build()));
}
@Test
public void shouldHaveSchemaForSource() {
Schema schema = source.schema();
assertThat(schema.name()).isNotEmpty();
assertThat(schema.version()).isNotNull();
assertThat(schema.field(SourceInfo.SERVER_NAME_KEY).schema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(schema.field(SourceInfo.REPLICA_SET_NAME).schema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(schema.field(SourceInfo.NAMESPACE).schema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(schema.field(SourceInfo.TIMESTAMP).schema()).isEqualTo(Schema.INT32_SCHEMA);
assertThat(schema.field(SourceInfo.ORDER).schema()).isEqualTo(Schema.INT32_SCHEMA);
assertThat(schema.field(SourceInfo.OPERATION_ID).schema()).isEqualTo(Schema.OPTIONAL_INT64_SCHEMA);
assertThat(schema.field(SourceInfo.INITIAL_SYNC).schema()).isEqualTo(SchemaBuilder.bool().optional().defaultValue(false).build());
}
@Test
public void shouldProducePartitionMap() {
partition = source.partition(REPLICA_SET_NAME);
assertThat(partition.get(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
assertThat(partition.get(SourceInfo.SERVER_ID_KEY)).isEqualTo("serverX");
assertThat(partition.size()).isEqualTo(2);
}
@Test
public void shouldReturnSamePartitionMapForSameReplicaName() {
partition = source.partition(REPLICA_SET_NAME);
assertThat(partition).isSameAs(source.partition(REPLICA_SET_NAME));
}
@Test
public void shouldSetAndReturnRecordedOffset() {
Document event = new Document().append("ts", new BsonTimestamp(100, 2))
.append("h", Long.valueOf(1987654321))
.append("ns", "dbA.collectA");
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
source.opLogEvent(REPLICA_SET_NAME, event);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(true);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
// Create a new source info and set the offset ...
Map<String, String> partition = source.partition(REPLICA_SET_NAME);
source = new SourceInfo(new MongoDbConnectorConfig(
Configuration.create()
.with(MongoDbConnectorConfig.LOGICAL_NAME, "serverX")
.with(MongoDbConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V1)
.build()));
source.setOffsetFor(partition, offset);
offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(100);
assertThat(ts.getInc()).isEqualTo(2);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
Struct struct = source.struct();
assertThat(struct.getInt32(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
assertThat(struct.getString(SourceInfo.NAMESPACE)).isEqualTo("dbA.collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
assertThat(struct.getString(SourceInfo.SERVER_NAME_KEY)).isEqualTo("serverX");
assertThat(struct.getBoolean(SourceInfo.INITIAL_SYNC)).isNull();
}
@Test
public void shouldReturnOffsetForUnusedReplicaName() {
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(0);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(0);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isNull();
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(0);
assertThat(ts.getInc()).isEqualTo(0);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
Struct struct = source.struct();
assertThat(struct.getInt32(SourceInfo.TIMESTAMP)).isEqualTo(0);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(0);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isNull();
assertThat(struct.getString(SourceInfo.NAMESPACE)).isEqualTo("dbA.collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
assertThat(struct.getString(SourceInfo.SERVER_NAME_KEY)).isEqualTo("serverX");
assertThat(struct.getBoolean(SourceInfo.INITIAL_SYNC)).isNull();
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
}
@Test
public void shouldReturnRecordedOffsetForUsedReplicaName() {
Document event = new Document().append("ts", new BsonTimestamp(100, 2))
.append("h", Long.valueOf(1987654321))
.append("ns", "dbA.collectA");
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
source.opLogEvent(REPLICA_SET_NAME, event);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(true);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(100);
assertThat(ts.getInc()).isEqualTo(2);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
Struct struct = source.struct();
assertThat(struct.getInt32(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
assertThat(struct.getString(SourceInfo.NAMESPACE)).isEqualTo("dbA.collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
assertThat(struct.getString(SourceInfo.SERVER_NAME_KEY)).isEqualTo("serverX");
assertThat(struct.getBoolean(SourceInfo.INITIAL_SYNC)).isNull();
}
@Test
public void shouldReturnOffsetForUnusedReplicaNameDuringInitialSync() {
source.startInitialSync(REPLICA_SET_NAME);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(0);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(0);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isNull();
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(0);
assertThat(ts.getInc()).isEqualTo(0);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
Struct struct = source.struct();
assertThat(struct.getInt32(SourceInfo.TIMESTAMP)).isEqualTo(0);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(0);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isNull();
assertThat(struct.getString(SourceInfo.NAMESPACE)).isEqualTo("dbA.collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
assertThat(struct.getString(SourceInfo.SERVER_NAME_KEY)).isEqualTo("serverX");
assertThat(struct.getBoolean(SourceInfo.INITIAL_SYNC)).isEqualTo(true);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
}
@Test
public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() {
source.startInitialSync(REPLICA_SET_NAME);
Document event = new Document().append("ts", new BsonTimestamp(100, 2))
.append("h", Long.valueOf(1987654321))
.append("ns", "dbA.collectA");
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
source.opLogEvent(REPLICA_SET_NAME, event);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(true);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(100);
assertThat(ts.getInc()).isEqualTo(2);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "dbA", "collectA"));
Struct struct = source.struct();
assertThat(struct.getInt32(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
assertThat(struct.getString(SourceInfo.NAMESPACE)).isEqualTo("dbA.collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
assertThat(struct.getString(SourceInfo.SERVER_NAME_KEY)).isEqualTo("serverX");
assertThat(struct.getBoolean(SourceInfo.INITIAL_SYNC)).isEqualTo(true);
}
@Test
public void versionIsPresent() {
final Document event = new Document().append("ts", new BsonTimestamp(100, 2))
.append("h", Long.valueOf(1987654321))
.append("ns", "dbA.collectA");
source.opLogEvent("rs", event);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
final Document event = new Document().append("ts", new BsonTimestamp(100, 2))
.append("h", Long.valueOf(1987654321))
.append("ns", "dbA.collectA");
source.opLogEvent("rs", event);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
@Test
public void schemaIsCorrect() {
final Schema schema = SchemaBuilder.struct()
.name("io.debezium.connector.mongo.Source")
.version(1)
.field("version", Schema.OPTIONAL_STRING_SCHEMA)
.field("connector", Schema.OPTIONAL_STRING_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("rs", Schema.STRING_SCHEMA)
.field("ns", Schema.STRING_SCHEMA)
.field("sec", Schema.INT32_SCHEMA)
.field("ord", Schema.INT32_SCHEMA)
.field("h", Schema.OPTIONAL_INT64_SCHEMA)
.field("initsync", SchemaBuilder.bool().optional().defaultValue(false).build())
.build();
assertThat(source.schema()).isEqualTo(schema);
}
}

View File

@ -329,7 +329,6 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
verifyNotFromTransaction(record);
});
// ---------------------------------------------------------------------------------------------------------------
@ -631,7 +630,6 @@ public void shouldConsumeAllEventsFromDatabaseWithCustomAuthSource() throws Inte
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
verifyNotFromTransaction(record);
});
// ---------------------------------------------------------------------------------------------------------------
@ -700,7 +698,6 @@ public void shouldConsumeEventsOnlyFromIncludedDatabases() throws InterruptedExc
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
verifyNotFromTransaction(record);
});
// ---------------------------------------------------------------------------------------------------------------
// Stop the connector
@ -767,7 +764,6 @@ public void shouldSupportDbRef() throws InterruptedException, IOException {
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
verifyNotFromTransaction(record);
});
// ---------------------------------------------------------------------------------------------------------------
@ -880,7 +876,6 @@ protected void verifyFromInitialSync(SourceRecord record, AtomicBoolean foundLas
Struct value = (Struct) record.value();
assertThat(value.getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isEqualTo("last");
}
verifyNotFromTransaction(record);
}
@Test
@ -945,9 +940,6 @@ public void shouldConsumeTransaction() throws InterruptedException, IOException
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
if (TestHelper.isOplogCaptureMode()) {
verifyFromTransaction(record, txOrder.incrementAndGet());
}
});
// ---------------------------------------------------------------------------------------------------------------
@ -975,9 +967,6 @@ public void shouldConsumeTransaction() throws InterruptedException, IOException
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
if (TestHelper.isOplogCaptureMode()) {
verifyFromTransaction(record, txOrder.incrementAndGet());
}
});
}
@ -1009,10 +998,8 @@ public void shouldResumeTransactionInMiddle() throws InterruptedException, IOExc
// Start the connector and terminate it when third event from transaction arrives
startAndConsumeTillEnd(MongoDbConnector.class, config, record -> {
final Struct struct = (Struct) record.value();
final Long txOrder = struct.getStruct("source").getInt64("tord");
final String name = struct.getString("after");
return (txOrder != null && txOrder.equals(3L))
|| (name != null && "Taste The Tropics Ice Cream".contains(name));
return "Taste The Tropics Ice Cream".contains(name);
});
// ---------------------------------------------------------------------------------------------------------------
@ -1050,9 +1037,6 @@ public void shouldResumeTransactionInMiddle() throws InterruptedException, IOExc
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
if (TestHelper.isOplogCaptureMode()) {
verifyFromTransaction(record, txOrder.incrementAndGet());
}
});
// ---------------------------------------------------------------------------------------------------------------
@ -1082,9 +1066,6 @@ public void shouldResumeTransactionInMiddle() throws InterruptedException, IOExc
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
if (TestHelper.isOplogCaptureMode()) {
verifyFromTransaction(record, expectedTxOrd.remove(0));
}
});
}
@ -1238,16 +1219,6 @@ protected void verifyNotFromInitialSync(SourceRecord record) {
assertThat(value.getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isNull();
}
protected void verifyFromTransaction(SourceRecord record, long order) {
assertThat(record.sourceOffset().containsKey(SourceInfo.TX_ORD)).isTrue();
final Struct value = (Struct) record.value();
assertThat(value.getStruct(Envelope.FieldName.SOURCE).getInt64(SourceInfo.TX_ORD)).isEqualTo(order);
}
protected void verifyNotFromTransaction(SourceRecord record) {
assertThat(record.sourceOffset().containsKey(SourceInfo.TX_ORD)).isFalse();
}
protected void verifyCreateOperation(SourceRecord record) {
verifyOperation(record, Operation.CREATE);
}
@ -1441,9 +1412,8 @@ public void shouldEmitHeartbeatMessages() throws InterruptedException, IOExcepti
coll.insertOne(doc, insertOptions);
});
// Heartbeat created by non-monitored collection event and
// heartbeat created by MongoDB heartbeat event (oplog only)
final int heartbeatRecordCount = TestHelper.isOplogCaptureMode() ? 2 : 1;
// Heartbeat created by non-monitored collection event
final int heartbeatRecordCount = 1;
records = consumeRecordsByTopic(heartbeatRecordCount);
final List<SourceRecord> heartbeatRecords = records.recordsForTopic("__debezium-heartbeat.mongo");
assertThat(heartbeatRecords.size()).isGreaterThanOrEqualTo(1);
@ -1580,20 +1550,12 @@ public void shouldGenerateRecordForUpdateEvent() throws Exception {
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
if (TestHelper.isOplogCaptureMode()) {
Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH));
patchObj.remove("$v");
assertThat(patchObj.toJson(COMPACT_JSON_SETTINGS)).isEqualTo(updateObj.toJson(COMPACT_JSON_SETTINGS));
assertThat(value.getString(Envelope.FieldName.AFTER)).isNull();
}
else {
TestHelper.assertChangeStreamUpdate(
objId,
value,
"{\"_id\": {\"$oid\": \"<OID>\"},\"name\": \"Sally\"}",
null,
"{\"name\": \"Sally\"}");
}
TestHelper.assertChangeStreamUpdate(
objId,
value,
"{\"_id\": {\"$oid\": \"<OID>\"},\"name\": \"Sally\"}",
null,
"{\"name\": \"Sally\"}");
assertThat(value.schema()).isSameAs(deleteRecord.valueSchema());
assertThat(value.getString(Envelope.FieldName.OPERATION)).isEqualTo(Operation.UPDATE.code());
@ -2085,20 +2047,12 @@ public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws E
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
assertThat(key.get("id")).isEqualTo(formatObjectId(objId));
if (TestHelper.isOplogCaptureMode()) {
Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH));
patchObj.remove("$v");
assertThat(patchObj.toJson(COMPACT_JSON_SETTINGS)).isEqualTo(updateObj.toJson(COMPACT_JSON_SETTINGS));
assertThat(value.getString(Envelope.FieldName.AFTER)).isNull();
}
else {
TestHelper.assertChangeStreamUpdate(
objId,
value,
"{\"_id\": {\"$oid\": \"<OID>\"},\"name\": \"Sally\"}",
null,
"{\"name\": \"Sally\"}");
}
TestHelper.assertChangeStreamUpdate(
objId,
value,
"{\"_id\": {\"$oid\": \"<OID>\"},\"name\": \"Sally\"}",
null,
"{\"name\": \"Sally\"}");
assertThat(value.schema()).isSameAs(deleteRecord.valueSchema());
assertThat(value.getString(Envelope.FieldName.OPERATION)).isEqualTo(Operation.UPDATE.code());

View File

@ -130,10 +130,6 @@ public void testStreamingOnlyMetrics() throws Exception {
final ObjectName objectName = getStreamingMetricsObjectName("mongodb", "mongo1");
assertThat(mBeanServer.getAttribute(objectName, "SourceEventPosition")).isNotNull();
if (TestHelper.isOplogCaptureMode()) {
assertThat(mBeanServer.getAttribute(objectName, "NumberOfCommittedTransactions")).isEqualTo(6L);
assertThat(mBeanServer.getAttribute(objectName, "LastTransactionId")).isNotNull();
}
assertThat(mBeanServer.getAttribute(objectName, "Connected")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "MonitoredTables")).isEqualTo(new String[]{});
assertThat(mBeanServer.getAttribute(objectName, "LastEvent")).isNotNull();

View File

@ -1,37 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
/**
* JUnit rule that skips tests not intended for oplog capture mode.
*
<<<<<<< HEAD
* @author Jiri Pechanec
=======
* @author Horia Chiorean
>>>>>>> DBZ-3342 Incremental snapshot support for MongoDB
*/
public class SkipForOplogTestRule implements TestRule {
@Override
public Statement apply(Statement base, Description description) {
if (TestHelper.isOplogCaptureMode()) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
System.out.println("Skipped as test not supported for oplog capture mode");
}
};
}
return base;
}
}

View File

@ -50,7 +50,6 @@ public void shouldHaveSchemaForSource() {
assertThat(schema.field(SourceInfo.COLLECTION).schema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(schema.field(SourceInfo.TIMESTAMP_KEY).schema()).isEqualTo(Schema.INT64_SCHEMA);
assertThat(schema.field(SourceInfo.ORDER).schema()).isEqualTo(Schema.INT32_SCHEMA);
assertThat(schema.field(SourceInfo.OPERATION_ID).schema()).isEqualTo(Schema.OPTIONAL_INT64_SCHEMA);
assertThat(schema.field(SourceInfo.SNAPSHOT_KEY).schema()).isEqualTo(AbstractSourceInfoStructMaker.SNAPSHOT_RECORD_SCHEMA);
}
@ -75,13 +74,12 @@ public void shouldSetAndReturnRecordedOffset() {
.append("ns", "dbA.collectA");
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
source.opLogEvent(REPLICA_SET_NAME, event);
source.initialPosition(REPLICA_SET_NAME, event);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(true);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
// Create a new source info and set the offset ...
Map<String, String> partition = source.partition(REPLICA_SET_NAME);
@ -94,7 +92,6 @@ public void shouldSetAndReturnRecordedOffset() {
offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(100);
@ -104,7 +101,6 @@ public void shouldSetAndReturnRecordedOffset() {
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(100_000);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("dbA");
assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
@ -119,7 +115,6 @@ public void shouldReturnOffsetForUnusedReplicaName() {
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(0);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(0);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isNull();
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(0);
@ -129,7 +124,6 @@ public void shouldReturnOffsetForUnusedReplicaName() {
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(0);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(0);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isNull();
assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("dbA");
assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
@ -146,13 +140,12 @@ public void shouldReturnRecordedOffsetForUsedReplicaName() {
.append("ns", "dbA.collectA");
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
source.opLogEvent(REPLICA_SET_NAME, event);
source.initialPosition(REPLICA_SET_NAME, event);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(true);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(100);
@ -162,7 +155,6 @@ public void shouldReturnRecordedOffsetForUsedReplicaName() {
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(100_000);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("dbA");
assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
@ -178,7 +170,6 @@ public void shouldReturnOffsetForUnusedReplicaNameDuringInitialSync() {
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(0);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(0);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isNull();
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(0);
@ -188,7 +179,6 @@ public void shouldReturnOffsetForUnusedReplicaNameDuringInitialSync() {
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(0);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(0);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isNull();
assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("dbA");
assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
@ -207,13 +197,12 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() {
.append("ns", "dbA.collectA");
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false);
source.opLogEvent(REPLICA_SET_NAME, event);
source.initialPosition(REPLICA_SET_NAME, event);
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(true);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(100);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(2);
assertThat(offset.get(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
BsonTimestamp ts = source.lastOffsetTimestamp(REPLICA_SET_NAME);
assertThat(ts.getTime()).isEqualTo(100);
@ -223,7 +212,6 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() {
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(100_000);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(2);
assertThat(struct.getInt64(SourceInfo.OPERATION_ID)).isEqualTo(1987654321L);
assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("dbA");
assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("collectA");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
@ -236,7 +224,7 @@ public void versionIsPresent() {
final Document event = new Document().append("ts", new BsonTimestamp(100, 2))
.append("h", Long.valueOf(1987654321))
.append("ns", "dbA.collectA");
source.opLogEvent("rs", event);
source.initialPosition("rs", event);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@ -245,7 +233,7 @@ public void connectorIsPresent() {
final Document event = new Document().append("ts", new BsonTimestamp(100, 2))
.append("h", Long.valueOf(1987654321))
.append("ns", "dbA.collectA");
source.opLogEvent("rs", event);
source.initialPosition("rs", event);
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
@ -263,9 +251,6 @@ public void schemaIsCorrect() {
.field("rs", Schema.STRING_SCHEMA)
.field("collection", Schema.STRING_SCHEMA)
.field("ord", Schema.INT32_SCHEMA)
.field("h", Schema.OPTIONAL_INT64_SCHEMA)
.field("tord", Schema.OPTIONAL_INT64_SCHEMA)
.field("stxnid", Schema.OPTIONAL_STRING_SCHEMA)
.field("lsid", Schema.OPTIONAL_STRING_SCHEMA)
.field("txnNumber", Schema.OPTIONAL_INT64_SCHEMA)
.build();

View File

@ -28,7 +28,6 @@
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureMode;
/**
* A common test configuration options
@ -47,9 +46,6 @@ public static Configuration getConfiguration() {
.withDefault(MongoDbConnectorConfig.HOSTS, "rs0/localhost:27017")
.withDefault(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, false)
.withDefault(MongoDbConnectorConfig.LOGICAL_NAME, "mongo1");
if (isOplogCaptureMode()) {
cfgBuilder.withDefault(MongoDbConnectorConfig.CAPTURE_MODE, CaptureMode.OPLOG);
}
return cfgBuilder.build();
}
@ -110,10 +106,6 @@ public static String captureMode() {
return System.getProperty(TEST_PROPERTY_PREFIX + "capture.mode", "changestreams");
}
public static final boolean isOplogCaptureMode() {
return "oplog".equals(captureMode());
}
public static void assertChangeStreamUpdate(ObjectId oid, Struct value, String after, List<String> removedFields,
String updatedFields) {
Assertions.assertThat(value.getString("after")).isEqualTo(after.replace("<OID>", oid.toHexString()));
@ -149,4 +141,4 @@ public static void assertChangeStreamUpdateAsDocs(ObjectId oid, Struct value, St
Assertions.assertThat(removedFields).isNull();
}
}
}
}

View File

@ -55,12 +55,12 @@ public void transactionMetadata() throws Exception {
List<Document> documentsToInsert2 = loadTestDocuments("restaurants6.json");
insertDocuments("dbA", "c1", documentsToInsert2.toArray(new Document[0]));
// BEGIN, data, END, BEGIN data for oplog, BEGIN, data, END, data for change stream
final SourceRecords records = consumeRecordsByTopic(1 + 6 + 1 + (TestHelper.isOplogCaptureMode() ? 1 : 0) + 1);
// BEGIN, data, END, data for change stream
final SourceRecords records = consumeRecordsByTopic(1 + 6 + 1 + 1);
final List<SourceRecord> c1s = records.recordsForTopic("mongo1.dbA.c1");
final List<SourceRecord> txs = records.recordsForTopic("mongo1.transaction");
assertThat(c1s).hasSize(7);
assertThat(txs).hasSize((TestHelper.isOplogCaptureMode() ? 3 : 2));
assertThat(txs).hasSize(2);
final List<SourceRecord> all = records.allRecordsInOrder();
final String txId1 = assertBeginTransaction(all.get(0));
@ -73,11 +73,6 @@ public void transactionMetadata() throws Exception {
assertEndTransaction(all.get(7), txId1, 6, Collect.hashMapOf("rs0.dbA.c1", 6));
if (TestHelper.isOplogCaptureMode()) {
final String txId2 = assertBeginTransaction(all.get(8));
assertRecordTransactionMetadata(all.get(9), txId2, 1, 1);
}
stopConnector();
}
@ -113,12 +108,12 @@ public void transactionMetadataWithCustomTopicName() throws Exception {
List<Document> documentsToInsert2 = loadTestDocuments("restaurants6.json");
insertDocuments("dbA", "c1", documentsToInsert2.toArray(new Document[0]));
// BEGIN, data, END, BEGIN data for oplog, BEGIN, data, END, data for change stream
final SourceRecords records = consumeRecordsByTopic(1 + 6 + 1 + (TestHelper.isOplogCaptureMode() ? 1 : 0) + 1);
// BEGIN, data, END, data
final SourceRecords records = consumeRecordsByTopic(1 + 6 + 1 + 1);
final List<SourceRecord> c1s = records.recordsForTopic("mongo1.dbA.c1");
final List<SourceRecord> txs = records.recordsForTopic("tx.of.mongo1");
assertThat(c1s).hasSize(7);
assertThat(txs).hasSize((TestHelper.isOplogCaptureMode() ? 3 : 2));
assertThat(txs).hasSize(2);
final List<SourceRecord> all = records.allRecordsInOrder();
final String txId1 = assertBeginTransaction(all.get(0));
@ -131,11 +126,6 @@ public void transactionMetadataWithCustomTopicName() throws Exception {
assertEndTransaction(all.get(7), txId1, 6, Collect.hashMapOf("rs0.dbA.c1", 6));
if (TestHelper.isOplogCaptureMode()) {
final String txId2 = assertBeginTransaction(all.get(8));
assertRecordTransactionMetadata(all.get(9), txId2, 1, 1);
}
stopConnector();
}
}

View File

@ -35,7 +35,6 @@
import org.junit.Test;
import io.debezium.connector.mongodb.MongoDbFieldName;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation;
import io.debezium.data.SchemaUtil;
@ -206,12 +205,7 @@ public void shouldTransformEvents() throws InterruptedException, IOException {
assertThat(transformedUnsetUpdate.valueSchema().field("_id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUnsetUpdateValue.get("_id")).isEqualTo(1);
if (TestHelper.isOplogCaptureMode()) {
assertThat(transformedUnsetUpdate.valueSchema().field("newStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
}
else {
assertThat(transformedUnsetUpdateValue.schema().field("newStr")).isNull();
}
assertThat(transformedUnsetUpdateValue.schema().field("newStr")).isNull();
// Test FullUpdate
primary().execute("update", client -> {
@ -328,7 +322,7 @@ public void shouldAddSourceFields() throws InterruptedException {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord , db,rs");
props.put(ADD_SOURCE_FIELDS, "ts_ms,ord , db,rs");
transformation.configure(props);
// insert
@ -362,7 +356,6 @@ public void shouldAddSourceFields() throws InterruptedException {
// assert source fields' values
final Struct value = (Struct) transformed.value();
assertThat(value.get("__h")).isEqualTo(source.getInt64("h"));
assertThat(value.get("__ts_ms")).isEqualTo(source.getInt64("ts_ms"));
assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
assertThat(value.get("__db")).isEqualTo(source.getString("db"));
@ -377,7 +370,7 @@ public void shouldAddSourceFieldsForRewriteDeleteEvent() throws InterruptedExcep
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord,db,rs");
props.put(ADD_SOURCE_FIELDS, "ts_ms,ord,db,rs");
props.put(HANDLE_DELETES, "rewrite");
transformation.configure(props);
@ -411,7 +404,6 @@ public void shouldAddSourceFieldsForRewriteDeleteEvent() throws InterruptedExcep
// assert source fields' values
final Struct value = (Struct) transformed.value();
assertThat(value.get("__h")).isEqualTo(source.getInt64("h"));
assertThat(value.get("__ts_ms")).isEqualTo(source.getInt64("ts_ms"));
assertThat(value.get("__ord")).isEqualTo(source.getInt32("ord"));
assertThat(value.get("__db")).isEqualTo(source.getString("db"));
@ -642,15 +634,8 @@ public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedExce
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
if (TestHelper.isOplogCaptureMode()) {
assertThat(value.get("phone")).isNull();
assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(4);
}
else {
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
@Test
@ -698,15 +683,8 @@ public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedExce
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
if (TestHelper.isOplogCaptureMode()) {
assertThat(value.get("phone")).isNull();
assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(3);
}
else {
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
assertThat(value.schema().field("phone")).isNull();
assertThat(value.schema().fields()).hasSize(2);
}
@Test
@ -1256,7 +1234,7 @@ public void shouldFlattenWithDelimiterTransformRecordForUpdateEvent() throws Int
assertThat(value.schema().field("address-city").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address-name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("address-city2-part").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(value.schema().fields()).hasSize(TestHelper.isOplogCaptureMode() ? 4 : 7);
assertThat(value.schema().fields()).hasSize(7);
}
@Test
@ -1524,25 +1502,17 @@ public void testAddPatchFieldAfterUpdate() throws Exception {
assertThat(value.schema().field("_id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("a").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
if (TestHelper.isOplogCaptureMode()) {
String valueJson = TestHelper.getDocumentWithoutLanguageVersion(value.getString("__patch")).toJson();
assertThat(valueJson).isEqualTo("{\"$set\": {\"a\": 22}}");
assertThat(value.schema().field("__patch").schema()).isEqualTo(io.debezium.data.Json.builder().optional().build());
assertThat(value.schema().fields()).hasSize(3);
}
else {
// 4 data fields + 1 __patch
assertThat(value.schema().fields()).hasSize(4 + 1);
// 4 data fields + 1 __patch
assertThat(value.schema().fields()).hasSize(4 + 1);
assertThat(value.schema().field("__patch").schema()).isEqualTo(io.debezium.data.Json.builder().optional().build());
assertThat(value.get("__patch")).isNull();
assertThat(value.schema().field("__patch").schema()).isEqualTo(io.debezium.data.Json.builder().optional().build());
assertThat(value.get("__patch")).isNull();
assertThat(value.get("b")).isEqualTo(2);
assertThat(value.schema().field("b").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(value.get("b")).isEqualTo(2);
assertThat(value.schema().field("b").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(value.get("c")).isEqualTo(3);
assertThat(value.schema().field("c").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
}
assertThat(value.get("c")).isEqualTo(3);
assertThat(value.schema().field("c").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
}

View File

@ -15,7 +15,6 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState;
import io.debezium.data.VerifyRecord;
@ -120,14 +119,8 @@ public void shouldTransformOperationPush() throws InterruptedException {
// Operations which include items to arrays result in a new field where the structure looks like "FIELD_NAME.ARRAY_INDEX"
VerifyRecord.assertConnectSchemasAreEqual("_id", valueSchema.field("_id").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
if (TestHelper.isOplogCaptureMode()) {
VerifyRecord.assertConnectSchemasAreEqual("dataArrayOfStr.3", valueSchema.field("dataArrayOfStr.3").schema(), Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedUpdateValue.get("dataArrayOfStr.3")).isEqualTo("g");
}
else {
VerifyRecord.assertConnectSchemasAreEqual("dataArrayOfStr", valueSchema.field("dataArrayOfStr").schema(),
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build());
assertThat(transformedUpdateValue.get("dataArrayOfStr")).isEqualTo(Arrays.asList("a", "c", "e", "g"));
}
VerifyRecord.assertConnectSchemasAreEqual("dataArrayOfStr", valueSchema.field("dataArrayOfStr").schema(),
SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build());
assertThat(transformedUpdateValue.get("dataArrayOfStr")).isEqualTo(Arrays.asList("a", "c", "e", "g"));
}
}

View File

@ -19,7 +19,6 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.UpdateOptions;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState;
import io.debezium.data.VerifyRecord;
@ -49,15 +48,9 @@ public void shouldTransformOperationInc() throws InterruptedException {
VerifyRecord.assertConnectSchemasAreEqual("dataInt", valueSchema.field("dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedUpdateValue.get("dataInt")).isEqualTo(246);
if (TestHelper.isOplogCaptureMode()) {
VerifyRecord.assertConnectSchemasAreEqual("nested.dataInt", valueSchema.field("nested.dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("nested.dataInt")).isEqualTo(100);
}
else {
VerifyRecord.assertConnectSchemasAreEqual("nested.dataInt",
valueSchema.field("nested").schema().field("dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(100);
}
VerifyRecord.assertConnectSchemasAreEqual("nested.dataInt",
valueSchema.field("nested").schema().field("dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(100);
}
/**
@ -76,13 +69,7 @@ public void shouldTransformOperationMin() throws InterruptedException {
VerifyRecord.assertConnectSchemasAreEqual("dataInt", valueSchema.field("dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedUpdateValue.get("dataInt")).isEqualTo(122);
if (TestHelper.isOplogCaptureMode()) {
// Since 124 > 123 we should expect "nested.dataInt" to not be present
assertThat(valueSchema.field("nested.dataInt")).isNull();
}
else {
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(123);
}
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(123);
}
/**
@ -99,16 +86,8 @@ public void shouldTransformOperationMax() throws InterruptedException {
VerifyRecord.assertConnectSchemasAreEqual("_id", valueSchema.field("_id").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
if (TestHelper.isOplogCaptureMode()) {
// Since 122 < 123 we should expect "dataInt" to not be present
assertThat(valueSchema.field("dataInt")).isNull();
VerifyRecord.assertConnectSchemasAreEqual("nested.dataInt", valueSchema.field("nested.dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("nested.dataInt")).isEqualTo(124);
}
else {
assertThat(transformedUpdateValue.get("dataInt")).isEqualTo(123);
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(124);
}
assertThat(transformedUpdateValue.get("dataInt")).isEqualTo(123);
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(124);
}
/**
@ -129,15 +108,9 @@ public void shouldTransformOperationMul() throws InterruptedException {
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedUpdateValue.get("dataInt")).isEqualTo(369);
assertThat(transformedUpdateValue.get("nonExistentField")).isEqualTo(0);
if (TestHelper.isOplogCaptureMode()) {
VerifyRecord.assertConnectSchemasAreEqual("nested.dataInt", valueSchema.field("nested.dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("nested.dataInt")).isEqualTo(246);
}
else {
VerifyRecord.assertConnectSchemasAreEqual("nested.dataInt",
valueSchema.field("nested").schema().field("dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(246);
}
VerifyRecord.assertConnectSchemasAreEqual("nested.dataInt",
valueSchema.field("nested").schema().field("dataInt").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.getStruct("nested").get("dataInt")).isEqualTo(246);
}
/**
@ -157,14 +130,7 @@ public void shouldTransformOperationRename() throws InterruptedException {
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
assertThat(transformedUpdateValue.get("dataIntNewName")).isEqualTo(123);
if (TestHelper.isOplogCaptureMode()) {
// Ensure the rename causes the old field value to be set to null
VerifyRecord.assertConnectSchemasAreEqual("dataInt", valueSchema.field("dataInt").schema(), Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedUpdateValue.get("dataInt")).isNull();
}
else {
assertThat(valueSchema.field("dataInt")).isNull();
}
assertThat(valueSchema.field("dataInt")).isNull();
}
/**
@ -229,13 +195,7 @@ public void shouldTransformOperationSetOnInsert() throws InterruptedException {
VerifyRecord.assertConnectSchemasAreEqual("newField", updateValueSchema.field("newField").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("_id")).isEqualTo(2);
assertThat(transformedUpdateValue.get("newField")).isEqualTo(456);
if (TestHelper.isOplogCaptureMode()) {
// Ensure on the second update the field is not set
assertThat(updateValueSchema.field("onlySetIfInsertDataInt")).isNull();
}
else {
assertThat(transformedUpdateValue.get("onlySetIfInsertDataInt")).isEqualTo(789);
}
assertThat(transformedUpdateValue.get("onlySetIfInsertDataInt")).isEqualTo(789);
}
/**
@ -255,14 +215,7 @@ public void shouldTransformOperationUnset() throws InterruptedException {
VerifyRecord.assertConnectSchemasAreEqual("_id", valueSchema.field("_id").schema(), Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUpdateValue.get("_id")).isEqualTo(1);
if (TestHelper.isOplogCaptureMode()) {
// Unset fields come as null value
VerifyRecord.assertConnectSchemasAreEqual("dataStr", valueSchema.field("dataStr").schema(), Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedUpdateValue.get("dataStr")).isNull();
}
else {
assertThat(valueSchema.field("dataStr")).isNull();
}
assertThat(valueSchema.field("dataStr")).isNull();
// Since the field "nonExistentField" doesn't exist ensure it's not present in the schema
assertThat(valueSchema.field("nonExistentField")).isNull();
}

View File

@ -6,14 +6,8 @@ matrixJob('connector-debezium-mongodb-matrix-test') {
description('Executes tests for MongoDB Connector with MongoDB matrix')
label('Slave')
combinationFilter('''
(CAPTURE_MODE == 'oplog' && (MONGODB_VERSION.startsWith('3.') || MONGODB_VERSION == '4.0')) ||
(CAPTURE_MODE == 'change_streams_update_full' && !MONGODB_VERSION.startsWith('3.'))
''')
axes {
text('MONGODB_VERSION', '3.2', '3.4', '3.6', '4.0', '4.2', '4.4', '5.0')
text('CAPTURE_MODE', 'oplog', 'change_streams_update_full')
text('MONGODB_VERSION', '4.0', '4.2', '4.4', '5.0')
label("Node", "Slave")
}
@ -72,7 +66,6 @@ fi
mvn clean install -U -s $HOME/.m2/settings-snapshots.xml -pl debezium-bom,debezium-connector-mongodb -am -fae \
-Dmaven.test.failure.ignore=true \
-Dversion.mongo.server=$MONGODB_VERSION \
-Dcapture.mode=$CAPTURE_MODE \
-Dinsecure.repositories=WARN \
$PROFILE_PROD
''')

View File

@ -129,7 +129,7 @@
<!-- Databases, should align with database drivers -->
<version.mysql.server>5.7</version.mysql.server>
<version.mongo.server>3.6</version.mongo.server>
<version.mongo.server>4.0</version.mongo.server>
<version.cassandra3>3.11.12</version.cassandra3>
<version.cassandra4>4.0.2</version.cassandra4>