DBZ-1879 Introduce JMX tests

This commit is contained in:
Chris Cranford 2020-03-16 15:08:04 -04:00 committed by Gunnar Morling
parent 1a31d6c372
commit d6114bb99f
3 changed files with 193 additions and 0 deletions

View File

@ -340,6 +340,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
LOGGER.info("Beginning snapshot of '{}' at {}", rsName, rsOffsetContext.getOffset());
final List<CollectionId> collections = primaryClient.collections();
snapshotProgressListener.monitoredDataCollectionsDetermined(collections);
if (connectionContext.maxNumberOfCopyThreads() > 1) {
// Since multiple copy threads are to be used, create a thread pool and initiate the copy.
// The current thread will wait until the copy threads either have completed or an error occurred.

View File

@ -8,6 +8,10 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@ -23,6 +27,7 @@
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
/**
@ -164,4 +169,39 @@ protected BiConsumer<String, Throwable> connectionErrorHandler(int numErrorsBefo
logger.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
};
}
protected void storeDocuments(String dbName, String collectionName, String pathOnClasspath) {
primary().execute("storing documents", mongo -> {
Testing.debug("Storing in '" + dbName + "." + collectionName + "' documents loaded from from '" + pathOnClasspath + "'");
MongoDatabase db1 = mongo.getDatabase(dbName);
MongoCollection<Document> coll = db1.getCollection(collectionName);
coll.drop();
storeDocuments(coll, pathOnClasspath);
});
}
protected void storeDocuments(MongoCollection<Document> collection, String pathOnClasspath) {
InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true);
loadTestDocuments(pathOnClasspath).forEach(doc -> {
assertThat(doc).isNotNull();
assertThat(doc.size()).isGreaterThan(0);
collection.insertOne(doc, insertOptions);
});
}
protected List<Document> loadTestDocuments(String pathOnClasspath) {
List<Document> results = new ArrayList<>();
try (InputStream stream = Testing.Files.readResourceAsStream(pathOnClasspath)) {
assertThat(stream).isNotNull();
IoUtil.readLines(stream, line -> {
Document doc = Document.parse(line);
assertThat(doc.size()).isGreaterThan(0);
results.add(doc);
});
}
catch (IOException e) {
fail("Unable to find or read file '" + pathOnClasspath + "': " + e.getMessage());
}
return results;
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.lang.management.ManagementFactory;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.junit.Test;
/**
* @author Chris Cranford
*/
public class MongoMetricsIT extends AbstractMongoConnectorIT {
@Test
public void testLifecycle() throws Exception {
// Setup
this.config = TestHelper.getConfiguration()
.edit()
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL)
.with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*")
.build();
this.context = new MongoDbTaskContext(config);
// start connector
start(MongoDbConnector.class, config);
assertConnectorIsRunning();
// These methods use the JMX metrics, this simply checks they're available as expected
waitForSnapshotToBeCompleted("mongodb", "mongo1");
waitForStreamingRunning("mongodb", "mongo1");
// Stop the connector
stopConnector();
// Verify snapshot metrics no longer exist
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
mBeanServer.getMBeanInfo(getSnapshotMetricsObjectName("mongodb", "mongo1"));
fail("Expected Snapshot Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
// Verify streaming metrics no longer exist
try {
mBeanServer.getMBeanInfo(getStreamingMetricsObjectName("mongodb", "mongo1"));
fail("Expected Streaming Metrics no longer to exist");
}
catch (InstanceNotFoundException e) {
// expected
}
}
@Test
public void testSnapshotOnlyMetrics() throws Exception {
// Setup
this.config = TestHelper.getConfiguration()
.edit()
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL)
.with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*")
.build();
this.context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(primary(), "dbit");
storeDocuments("dbit", "restaurants", "restaurants1.json");
// start connector
start(MongoDbConnector.class, config);
assertConnectorIsRunning();
// wait for snapshot to have finished
waitForSnapshotToBeCompleted("mongodb", "mongo1");
SourceRecords records = consumeRecordsByTopic(6);
assertThat(records.topics().size()).isEqualTo(1);
assertThat(records.recordsForTopic("mongo1.dbit.restaurants").size()).isEqualTo(6);
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getSnapshotMetricsObjectName("mongodb", "mongo1");
for (MBeanAttributeInfo info : mBeanServer.getMBeanInfo(objectName).getAttributes()) {
System.out.println(info.getName() + " => " + mBeanServer.getAttribute(objectName, info.getName()));
}
assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1);
assertThat(mBeanServer.getAttribute(objectName, "RemainingTableCount")).isEqualTo(0);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(6L);
assertThat(mBeanServer.getAttribute(objectName, "NumberOfEventsFiltered")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "NumberOfErroneousEvents")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "MonitoredTables")).isEqualTo(new String[]{ "rs0.dbit.restaurants" });
assertThat(mBeanServer.getAttribute(objectName, "LastEvent")).isNotNull();
}
@Test
public void testStreamingOnlyMetrics() throws Exception {
// Setup
this.config = TestHelper.getConfiguration()
.edit()
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER)
.with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*")
.build();
this.context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(primary(), "dbit");
// start connector
start(MongoDbConnector.class, config);
assertConnectorIsRunning();
// wait for streaming to have started
waitForStreamingRunning("mongodb", "mongo1");
storeDocuments("dbit", "restaurants", "restaurants1.json");
SourceRecords records = consumeRecordsByTopic(6);
assertThat(records.topics().size()).isEqualTo(1);
assertThat(records.recordsForTopic("mongo1.dbit.restaurants").size()).isEqualTo(6);
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getStreamingMetricsObjectName("mongodb", "mongo1");
for (MBeanAttributeInfo info : mBeanServer.getMBeanInfo(objectName).getAttributes()) {
System.out.println(info.getName() + " => " + mBeanServer.getAttribute(objectName, info.getName()));
}
assertThat(mBeanServer.getAttribute(objectName, "SourceEventPosition")).isNotNull();
assertThat(mBeanServer.getAttribute(objectName, "NumberOfCommittedTransactions")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "LastTransactionId")).isNull();
assertThat(mBeanServer.getAttribute(objectName, "Connected")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "MonitoredTables")).isEqualTo(new String[]{});
assertThat(mBeanServer.getAttribute(objectName, "LastEvent")).isNotNull();
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(6L);
assertThat(mBeanServer.getAttribute(objectName, "NumberOfEventsFiltered")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "NumberOfErroneousEvents")).isEqualTo(0L);
assertThat((Long) mBeanServer.getAttribute(objectName, "MilliSecondsSinceLastEvent")).isGreaterThanOrEqualTo(0);
assertThat((Long) mBeanServer.getAttribute(objectName, "MilliSecondsBehindSource")).isGreaterThanOrEqualTo(0);
}
}