DBZ-7260 Removed RS name from CollectionId
This commit is contained in:
parent
4c64e28832
commit
99439c3cfa
@ -21,57 +21,33 @@
|
||||
public final class CollectionId implements DataCollectionId {
|
||||
|
||||
/**
|
||||
* Parse the supplied {@code <replicaset_name>.<database_name>.<collection_name>} string.
|
||||
* Parse the supplied {@code <database_name>.<collection_name>} string.
|
||||
* The {@code collection_name} can also contain dots in its value.
|
||||
*
|
||||
* @param str the string representation of the collection identifier; may not be null
|
||||
* @return the collection ID, or null if it could not be parsed
|
||||
*/
|
||||
public static CollectionId parse(String str) {
|
||||
final int rsDotPosition = str.indexOf('.');
|
||||
if (rsDotPosition == -1 || (rsDotPosition + 1) == str.length() || rsDotPosition == 0) {
|
||||
return null;
|
||||
}
|
||||
final int dbDotPosition = str.indexOf('.', rsDotPosition + 1);
|
||||
if (dbDotPosition == -1 || (dbDotPosition + 1) == str.length() || dbDotPosition == rsDotPosition + 1) {
|
||||
return null;
|
||||
}
|
||||
return new CollectionId(str.substring(0, rsDotPosition), str.substring(rsDotPosition + 1, dbDotPosition),
|
||||
str.substring(dbDotPosition + 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the supplied {@code <database_name>.<collection_name>} string.
|
||||
* The {@code collection_name} can also contain dots in its value.
|
||||
*
|
||||
* @param replicaSetName the name of replica; may not be null
|
||||
* @param str the string representation of the collection identifier; may not be null
|
||||
* @return the collection ID, or null if it could not be parsed
|
||||
*/
|
||||
public static CollectionId parse(String replicaSetName, String str) {
|
||||
final int dotPosition = str.indexOf('.');
|
||||
if (dotPosition == -1 || (dotPosition + 1) == str.length() || dotPosition == 0) {
|
||||
return null;
|
||||
}
|
||||
return new CollectionId(replicaSetName, str.substring(0, dotPosition), str.substring(dotPosition + 1));
|
||||
|
||||
return new CollectionId(str.substring(0, dotPosition), str.substring(dotPosition + 1));
|
||||
}
|
||||
|
||||
private final String replicaSetName;
|
||||
private final String dbName;
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* Create a new collection identifier.
|
||||
*
|
||||
* @param replicaSetName the name of the replica set; may not be null
|
||||
* @param dbName the name of the database; may not be null
|
||||
* @param collectionName the name of the collection; may not be null
|
||||
*/
|
||||
public CollectionId(String replicaSetName, String dbName, String collectionName) {
|
||||
this.replicaSetName = replicaSetName;
|
||||
public CollectionId(String dbName, String collectionName) {
|
||||
this.dbName = dbName;
|
||||
this.name = collectionName;
|
||||
assert this.replicaSetName != null;
|
||||
assert this.dbName != null;
|
||||
assert this.name != null;
|
||||
}
|
||||
@ -94,23 +70,14 @@ public String dbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the name of the replica set in which the collection (and database) exist.
|
||||
*
|
||||
* @return the replica set name; never null
|
||||
*/
|
||||
public String replicaSetName() {
|
||||
return replicaSetName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String identifier() {
|
||||
return replicaSetName + "." + dbName + "." + name;
|
||||
return dbName + "." + name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> parts() {
|
||||
return Collect.arrayListOf(replicaSetName, dbName, name);
|
||||
return Collect.arrayListOf(dbName, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -135,20 +102,19 @@ public boolean equals(Object obj) {
|
||||
}
|
||||
if (obj instanceof CollectionId) {
|
||||
CollectionId that = (CollectionId) obj;
|
||||
return this.replicaSetName.equals(that.replicaSetName) &&
|
||||
this.dbName.equals(that.dbName) && this.name.equals(that.name);
|
||||
return this.dbName.equals(that.dbName) && this.name.equals(that.name);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the namespace of this collection, which is comprised of the {@link #dbName database name} and {@link #name collection
|
||||
* Get the namespace of this collection, which is composed of the {@link #dbName database name} and {@link #name collection
|
||||
* name}.
|
||||
*
|
||||
* @return the namespace for this collection; never null
|
||||
*/
|
||||
public String namespace() {
|
||||
return dbName + "." + name;
|
||||
return identifier();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -63,7 +63,7 @@ private Predicate<CollectionId> createCollectionFilter() {
|
||||
|
||||
// Create signal collection filter if specified and not included
|
||||
Optional<Predicate<CollectionId>> signalCollectionFilter = config.getSignalDataCollection()
|
||||
.map(id -> CollectionId.parse("UNUSED", id))
|
||||
.map(CollectionId::parse)
|
||||
.filter(id -> !collectionFilter.test(id))
|
||||
.map(id -> Predicates.includes(id.namespace(), CollectionId::namespace));
|
||||
|
||||
|
@ -99,7 +99,7 @@ public void start(Map<String, String> props) {
|
||||
this.config = config;
|
||||
this.connectionContext = new ConnectionContext(config);
|
||||
|
||||
logger.info("Successfully started MongoDB connector, and continuing to discover changes in replica set(s) at {}", connectionContext.maskedConnectionSeed());
|
||||
logger.info("Successfully started MongoDB connector, and continuing to discover at {}", connectionContext.maskedConnectionSeed());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,7 +18,6 @@
|
||||
import com.mongodb.client.model.changestream.FullDocument;
|
||||
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
|
||||
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import io.debezium.connector.mongodb.connection.MongoDbConnection;
|
||||
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor;
|
||||
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.ResumableChangeStreamEvent;
|
||||
@ -148,7 +147,6 @@ private void dispatchChangeEvent(
|
||||
MongoDbOffsetContext offsetContext)
|
||||
throws InterruptedException {
|
||||
var collectionId = new CollectionId(
|
||||
ConnectionStrings.replicaSetName(taskContext.getConnectionString()),
|
||||
event.getNamespace().getDatabaseName(),
|
||||
event.getNamespace().getCollectionName());
|
||||
|
||||
|
@ -223,7 +223,7 @@ private void noEvent(String resumeToken) {
|
||||
private void noEvent(Position position) {
|
||||
String namespace = "";
|
||||
long wallTime = 0L;
|
||||
onEvent(CollectionId.parse(connectorConfig.getReplicaSetName(), namespace), position, wallTime);
|
||||
onEvent(CollectionId.parse(namespace), position, wallTime);
|
||||
}
|
||||
|
||||
public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEvent) {
|
||||
@ -240,7 +240,7 @@ public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEve
|
||||
}
|
||||
}
|
||||
|
||||
onEvent(CollectionId.parse(connectorConfig.getReplicaSetName(), namespace), position, wallTime);
|
||||
onEvent(CollectionId.parse(namespace), position, wallTime);
|
||||
}
|
||||
|
||||
private void onEvent(CollectionId collectionId, Position position, long wallTime) {
|
||||
|
@ -162,7 +162,7 @@ public List<CollectionId> collections() throws InterruptedException {
|
||||
|
||||
for (String dbName : databaseNames) {
|
||||
MongoUtil.forEachCollectionNameInDatabase(client, dbName, collectionName -> {
|
||||
CollectionId collectionId = new CollectionId(name, dbName, collectionName);
|
||||
CollectionId collectionId = new CollectionId(dbName, collectionName);
|
||||
|
||||
if (filters.collectionFilter().test(collectionId)) {
|
||||
collections.add(collectionId);
|
||||
|
@ -72,7 +72,7 @@ public String getSchemaFilePath() {
|
||||
@Override
|
||||
public List<DataCollection> getMatchingCollections(Configuration configuration) {
|
||||
return getConnector().getMatchingCollections(configuration).stream()
|
||||
.map(collectionId -> new DataCollection(collectionId.replicaSetName(), collectionId.dbName(), collectionId.name()))
|
||||
.map(collectionId -> new DataCollection(collectionId.dbName(), collectionId.name()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,6 @@
|
||||
import io.debezium.connector.mongodb.MongoDbPartition;
|
||||
import io.debezium.connector.mongodb.MongoDbSchema;
|
||||
import io.debezium.connector.mongodb.MongoDbTaskContext;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import io.debezium.connector.mongodb.connection.MongoDbConnection;
|
||||
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
@ -116,7 +115,7 @@ public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config
|
||||
this.progressListener = progressListener;
|
||||
this.dataListener = dataChangeEventListener;
|
||||
this.signallingCollectionId = connectorConfig.getSignalingDataCollectionId() == null ? null
|
||||
: CollectionId.parse("UNUSED", connectorConfig.getSignalingDataCollectionId());
|
||||
: CollectionId.parse(connectorConfig.getSignalingDataCollectionId());
|
||||
this.notificationService = notificationService;
|
||||
this.incrementalSnapshotThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, config.getConnectorName(),
|
||||
"incremental-snapshot", connectorConfig.getSnapshotMaxThreads());
|
||||
@ -250,8 +249,6 @@ private WatermarkWindowCloser getWatermarkWindowCloser(CommonConnectorConfig con
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void init(MongoDbPartition partition, OffsetContext offsetContext) {
|
||||
// Only ReplicaSet deployments are supported by incremental snapshot
|
||||
// Thus assume replicaSets.size() == 1
|
||||
mongo = connections.get(partition);
|
||||
|
||||
if (offsetContext == null) {
|
||||
@ -398,12 +395,7 @@ public void addDataCollectionNamesToSnapshot(SignalPayload<MongoDbPartition> sig
|
||||
|
||||
context = (IncrementalSnapshotContext<CollectionId>) offsetContext.getIncrementalSnapshotContext();
|
||||
final boolean shouldReadChunk = !context.snapshotRunning();
|
||||
final String rsName = ConnectionStrings.replicaSetName(taskContext.getConnectionString());
|
||||
List<String> dataCollectionIds = snapshotConfiguration.getDataCollections()
|
||||
.stream()
|
||||
.map(x -> rsName + "." + x.toString())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> dataCollectionIds = snapshotConfiguration.getDataCollections();
|
||||
final List<DataCollection<CollectionId>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, dataCollectionIds, List.of(), "");
|
||||
|
||||
if (shouldReadChunk) {
|
||||
@ -449,10 +441,7 @@ public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext
|
||||
}
|
||||
else {
|
||||
LOGGER.info("Removing '{}' collections from incremental snapshot", dataCollectionPatterns);
|
||||
final String rsName = ConnectionStrings.replicaSetName(taskContext.getConnectionString());
|
||||
final List<String> dataCollectionIds = dataCollectionPatterns.stream().map(x -> rsName + "." + x.toString()).collect(Collectors.toList());
|
||||
|
||||
for (String dataCollectionId : dataCollectionIds) {
|
||||
for (String dataCollectionId : dataCollectionPatterns) {
|
||||
final CollectionId collectionId = CollectionId.parse(dataCollectionId);
|
||||
if (currentCollection != null && currentCollection.id().equals(collectionId)) {
|
||||
window.clear();
|
||||
@ -473,7 +462,7 @@ public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext
|
||||
|
||||
notificationService
|
||||
.incrementalSnapshotNotificationService()
|
||||
.notifyAborted(context, partition, offsetContext, dataCollectionIds);
|
||||
.notifyAborted(context, partition, offsetContext, dataCollectionPatterns);
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
@ -34,7 +34,6 @@
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.bson.Document;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -118,7 +117,7 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
|
||||
|
||||
waitForStreamingRunning("mongodb", "mongo1", getStreamingNamespace(), "0");
|
||||
|
||||
Long totalSnapshotRecords = getTotalSnapshotRecords(replicaSetFullDataCollectionName(), "mongodb", "mongo1", "0", null);
|
||||
Long totalSnapshotRecords = getTotalSnapshotRecords(fullDataCollectionName(), "mongodb", "mongo1", "0", null);
|
||||
|
||||
batchInserts.get(120, TimeUnit.SECONDS);
|
||||
|
||||
@ -360,10 +359,4 @@ protected void startConnector(Function<Configuration.Builder, Configuration.Buil
|
||||
|
||||
waitForAvailableRecords(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private String replicaSetFullDataCollectionName() {
|
||||
return "rs0." + fullDataCollectionName();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -19,33 +19,32 @@ public class CollectionIdTest {
|
||||
|
||||
@Test
|
||||
public void shouldParseString() {
|
||||
assertParseable("a", "b", "c");
|
||||
assertParseable("b", "c");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseStringWithDottedCollection() {
|
||||
assertParseable("a", "b", "c.d");
|
||||
assertParseable("b", "c.d");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotParseStringWithDotAtStart() {
|
||||
assertThat(CollectionId.parse("rs0", ".a.b")).isNull();
|
||||
assertThat(CollectionId.parse(".a.b")).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotParseStringWithDotAtEnd() {
|
||||
assertThat(CollectionId.parse("rs0", "a.")).isNull();
|
||||
assertThat(CollectionId.parse("a.")).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotParseStringWithOneSegment() {
|
||||
assertThat(CollectionId.parse("rs0", "a")).isNull();
|
||||
assertThat(CollectionId.parse("a")).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotFullParseStringWithDot() {
|
||||
final CollectionId collectionId = CollectionId.parse("rs0.a.b.c");
|
||||
assertThat(collectionId.replicaSetName()).isEqualTo("rs0");
|
||||
final CollectionId collectionId = CollectionId.parse("a.b.c");
|
||||
assertThat(collectionId.dbName()).isEqualTo("a");
|
||||
assertThat(collectionId.name()).isEqualTo("b.c");
|
||||
}
|
||||
@ -57,21 +56,17 @@ public void shouldNotFullParseStringWithDotAtStart() {
|
||||
|
||||
@Test
|
||||
public void shouldNotParseFullStringWithDotAtEnd() {
|
||||
assertThat(CollectionId.parse("rs0.")).isNull();
|
||||
assertThat(CollectionId.parse("rs0.a.")).isNull();
|
||||
assertThat(CollectionId.parse("a.")).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotParseFullStringWithMissingSegment() {
|
||||
assertThat(CollectionId.parse("rs0")).isNull();
|
||||
assertThat(CollectionId.parse("rs0.a")).isNull();
|
||||
assertThat(CollectionId.parse("rs0..a")).isNull();
|
||||
assertThat(CollectionId.parse("a")).isNull();
|
||||
}
|
||||
|
||||
protected void assertParseable(String replicaSetName, String dbName, String collectionName) {
|
||||
protected void assertParseable(String dbName, String collectionName) {
|
||||
String str = dbName + "." + collectionName;
|
||||
id = CollectionId.parse(replicaSetName, str);
|
||||
assertThat(id.replicaSetName()).isEqualTo(replicaSetName);
|
||||
id = CollectionId.parse(str);
|
||||
assertThat(id.dbName()).isEqualTo(dbName);
|
||||
assertThat(id.name()).isEqualTo(collectionName);
|
||||
}
|
||||
|
@ -28,7 +28,6 @@
|
||||
|
||||
import io.debezium.DebeziumException;
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.connector.mongodb.connection.ConnectionStrings;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
public class ConnectionIT extends AbstractMongoIT {
|
||||
@ -124,6 +123,6 @@ public void shouldCreateMovieDatabase() throws InterruptedException {
|
||||
|
||||
// Now that we've put at least one document into our collection, verify we can see the database and collection ...
|
||||
assertThat(connection.databaseNames()).containsOnly("dbA", "dbB");
|
||||
assertThat(connection.collections()).containsOnly(new CollectionId(ConnectionStrings.replicaSetName(mongo.getConnectionString()), "dbA", "moviesA"));
|
||||
assertThat(connection.collections()).containsOnly(new CollectionId("dbA", "moviesA"));
|
||||
}
|
||||
}
|
||||
|
@ -319,7 +319,7 @@ public void shouldIncludeSignalingCollectionCoveredByRegexInExcludeList() {
|
||||
public void excludeFilterShouldRemoveMatchingField() {
|
||||
filters = build.excludeFields("db1.collectionA.key1").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", "db1.collectionA");
|
||||
CollectionId id = CollectionId.parse("db1.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key2\" : \"value2\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"value1\", \"key2\" : \"value2\" }")));
|
||||
@ -329,7 +329,7 @@ public void excludeFilterShouldRemoveMatchingField() {
|
||||
public void excludeFilterShouldRemoveMatchingFieldWithLeadingWhiteSpaces() {
|
||||
filters = build.excludeFields(" *.collectionA.key1").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", " *.collectionA");
|
||||
CollectionId id = CollectionId.parse(" *.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key2\" : \"value2\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"value1\", \"key2\" : \"value2\" }")));
|
||||
@ -340,7 +340,7 @@ public void excludeFilterShouldRemoveMatchingFieldWithLeadingWhiteSpaces() {
|
||||
public void excludeFilterShouldRemoveMatchingFieldWithLeadingMultipleAsterisks() {
|
||||
filters = build.excludeFields(" *.*.key1").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", " *.collectionA");
|
||||
CollectionId id = CollectionId.parse(" *.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key2\" : \"value2\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"value1\", \"key2\" : \"value2\" }")));
|
||||
@ -350,7 +350,7 @@ public void excludeFilterShouldRemoveMatchingFieldWithLeadingMultipleAsterisks()
|
||||
public void excludeFilterShouldRemoveMatchingFieldWithTrailingWhiteSpaces() {
|
||||
filters = build.excludeFields("db.collectionA.key1 ,db.collectionA.key2 ").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", "db.collectionA");
|
||||
CollectionId id = CollectionId.parse("db.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key3\" : \"value3\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"value1\", \"key2\" : \"value2\", \"key3\" : \"value3\" }")));
|
||||
@ -360,7 +360,7 @@ public void excludeFilterShouldRemoveMatchingFieldWithTrailingWhiteSpaces() {
|
||||
public void renameFilterShouldRenameMatchingField() {
|
||||
filters = build.renameFields("db1.collectionA.key1:key2").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", "db1.collectionA");
|
||||
CollectionId id = CollectionId.parse("db1.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key2\" : \"value1\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"value1\" }")));
|
||||
@ -370,7 +370,7 @@ public void renameFilterShouldRenameMatchingField() {
|
||||
public void renameFilterShouldRenameMatchingFieldWithLeadingWhiteSpaces() {
|
||||
filters = build.renameFields(" *.collectionA.key2:key3").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", " *.collectionA");
|
||||
CollectionId id = CollectionId.parse(" *.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key1\" : \"valueA\", \"key3\" : \"valueB\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"valueA\", \"key2\" : \"valueB\" }")));
|
||||
@ -381,7 +381,7 @@ public void renameFilterShouldRenameMatchingFieldWithLeadingWhiteSpaces() {
|
||||
public void renameFilterShouldRenameMatchingFieldWithLeadingMultipleAsterisks() {
|
||||
filters = build.renameFields(" *.*.key2:key3").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", " *.collectionA");
|
||||
CollectionId id = CollectionId.parse(" *.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key1\" : \"valueA\", \"key3\" : \"valueB\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"valueA\", \"key2\" : \"valueB\" }")));
|
||||
@ -391,20 +391,20 @@ public void renameFilterShouldRenameMatchingFieldWithLeadingMultipleAsterisks()
|
||||
public void renameFilterShouldRenameMatchingFieldWithTrailingWhiteSpaces() {
|
||||
filters = build.renameFields("db2.collectionA.key1:key2 ,db2.collectionA.key3:key4 ").createFilters();
|
||||
validateConfigFields();
|
||||
CollectionId id = CollectionId.parse("rs1.", "db2.collectionA");
|
||||
CollectionId id = CollectionId.parse("db2.collectionA");
|
||||
assertEquals(
|
||||
Document.parse(" { \"key2\" : \"valueA\", \"key4\" : \"valueB\" }"),
|
||||
filters.fieldFilterFor(id).apply(Document.parse(" { \"key1\" : \"valueA\", \"key3\" : \"valueB\" }")));
|
||||
}
|
||||
|
||||
protected void assertCollectionIncluded(String fullyQualifiedCollectionName) {
|
||||
CollectionId id = CollectionId.parse("rs1.", fullyQualifiedCollectionName);
|
||||
CollectionId id = CollectionId.parse(fullyQualifiedCollectionName);
|
||||
assertThat(id).isNotNull();
|
||||
assertThat(filters.collectionFilter().test(id)).isTrue();
|
||||
}
|
||||
|
||||
protected void assertCollectionExcluded(String fullyQualifiedCollectionName) {
|
||||
CollectionId id = CollectionId.parse("rs1.", fullyQualifiedCollectionName);
|
||||
CollectionId id = CollectionId.parse(fullyQualifiedCollectionName);
|
||||
assertThat(id).isNotNull();
|
||||
assertThat(filters.collectionFilter().test(id)).isFalse();
|
||||
}
|
||||
|
@ -451,7 +451,7 @@ public void snapshotOnlyWithRestart() throws Exception {
|
||||
|
||||
Awaitility.await().atMost(60, TimeUnit.SECONDS)
|
||||
.until(() -> interceptor
|
||||
.containsMessage("No data returned by the query, incremental snapshotting of table '" + "rs0." + fullDataCollectionName() + "' finished"));
|
||||
.containsMessage("No data returned by the query, incremental snapshotting of table '" + fullDataCollectionName() + "' finished"));
|
||||
|
||||
final int expectedRecordCount = ROW_COUNT;
|
||||
final AtomicInteger recordCounter = new AtomicInteger();
|
||||
|
@ -27,7 +27,7 @@ public void shouldAlwaysProduceCollectionSchema() {
|
||||
|
||||
final MongoDbSchema schema = getSchema(config, taskContext);
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
CollectionId id = new CollectionId("rs0", "dbA", "c" + i);
|
||||
CollectionId id = new CollectionId("dbA", "c" + i);
|
||||
DataCollectionSchema collectionSchema = schema.schemaFor(id);
|
||||
assertThat(collectionSchema).isNotNull();
|
||||
assertThat(collectionSchema.id()).isSameAs(id);
|
||||
|
@ -102,7 +102,7 @@ public void testSnapshotOnlyMetrics() throws Exception {
|
||||
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(6L);
|
||||
assertThat(mBeanServer.getAttribute(objectName, "NumberOfEventsFiltered")).isEqualTo(0L);
|
||||
assertThat(mBeanServer.getAttribute(objectName, "NumberOfErroneousEvents")).isEqualTo(0L);
|
||||
assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(new String[]{ "rs0.dbit.restaurants" });
|
||||
assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(new String[]{ "dbit.restaurants" });
|
||||
assertThat(mBeanServer.getAttribute(objectName, "LastEvent")).isNotNull();
|
||||
assertThat(mBeanServer.getAttribute(objectName, "NumberOfDisconnects")).isEqualTo(0L);
|
||||
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false);
|
||||
|
@ -138,7 +138,7 @@ public void assertSourceInfoContents(SourceInfo source,
|
||||
String resumeToken = source.lastResumeToken();
|
||||
assertThat(resumeToken).isEqualTo(resumeTokenData);
|
||||
|
||||
source.collectionEvent(new CollectionId(REPLICA_SET_NAME, "test", "names"), 0L);
|
||||
source.collectionEvent(new CollectionId("test", "names"), 0L);
|
||||
Struct struct = source.struct();
|
||||
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo((timestamp != null) ? timestamp.getTime() * 1000L : 0L);
|
||||
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1);
|
||||
@ -221,7 +221,7 @@ public void wallTimeIsPresent() {
|
||||
var cursor = mockEventChangeStreamCursor();
|
||||
source.initEvent(cursor);
|
||||
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isNull();
|
||||
source.collectionEvent(new CollectionId(REPLICA_SET_NAME, "test", "names"), 10L);
|
||||
source.collectionEvent(new CollectionId("test", "names"), 10L);
|
||||
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isEqualTo(10L);
|
||||
}
|
||||
|
||||
|
@ -48,7 +48,7 @@ protected AbstractStringAssert<?> assertTopic(TopicSelector<CollectionId> select
|
||||
}
|
||||
|
||||
protected CollectionId dbAndCollection(String dbName, String collectionName) {
|
||||
return new CollectionId("rs0", dbName, collectionName);
|
||||
return new CollectionId(dbName, collectionName);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ public void transactionMetadata() throws Exception {
|
||||
counter++;
|
||||
}
|
||||
|
||||
assertEndTransaction(all.get(7), txId1, 6, Collect.hashMapOf("rs0.dbA.c1", 6));
|
||||
assertEndTransaction(all.get(7), txId1, 6, Collect.hashMapOf("dbA.c1", 6));
|
||||
|
||||
stopConnector();
|
||||
}
|
||||
@ -125,7 +125,7 @@ public void transactionMetadataWithCustomTopicName() throws Exception {
|
||||
counter++;
|
||||
}
|
||||
|
||||
assertEndTransaction(all.get(7), txId1, 6, Collect.hashMapOf("rs0.dbA.c1", 6));
|
||||
assertEndTransaction(all.get(7), txId1, 6, Collect.hashMapOf("dbA.c1", 6));
|
||||
|
||||
stopConnector();
|
||||
}
|
||||
|
@ -121,9 +121,9 @@ public void testFiltersWithEmptyFilters() {
|
||||
.body("matchingCollections.size()", is(3))
|
||||
.body("matchingCollections",
|
||||
hasItems(
|
||||
Map.of("realm", "rs0", "namespace", "inventory", "name", "customers", "identifier", "rs0.inventory.customers"),
|
||||
Map.of("realm", "rs0", "namespace", "inventory", "name", "orders", "identifier", "rs0.inventory.orders"),
|
||||
Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products")));
|
||||
Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"),
|
||||
Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"),
|
||||
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -141,7 +141,7 @@ public void testFiltersWithValidCollectionIncludeList() {
|
||||
.body("validationResults.size()", is(0))
|
||||
.body("matchingCollections.size()", is(1))
|
||||
.body("matchingCollections",
|
||||
hasItems(Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products")));
|
||||
hasItems(Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -160,9 +160,9 @@ public void testFiltersWithValidDatabaseIncludeList() {
|
||||
.body("matchingCollections.size()", is(3))
|
||||
.body("matchingCollections",
|
||||
hasItems(
|
||||
Map.of("realm", "rs0", "namespace", "inventory", "name", "customers", "identifier", "rs0.inventory.customers"),
|
||||
Map.of("realm", "rs0", "namespace", "inventory", "name", "orders", "identifier", "rs0.inventory.orders"),
|
||||
Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products")));
|
||||
Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"),
|
||||
Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"),
|
||||
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user