DBZ-2114 OpenShift tests for MongoDB

This commit is contained in:
jcechace 2020-06-01 19:14:17 +02:00 committed by Jiri Pechanec
parent b025108176
commit 21773945fb
7 changed files with 437 additions and 0 deletions

View File

@ -0,0 +1,90 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases.mongodb;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.databases.DatabaseController;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.openshift.client.OpenShiftClient;
import okhttp3.Response;
/**
*
* @author Jakub Cechacek
*/
public class MongoController extends DatabaseController<MongoDatabaseClient> {
private static class MongoInitListener implements ExecListener {
private CountDownLatch latch;
public MongoInitListener(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void onOpen(Response response) {
LOGGER.info("Initializing MongoDB database");
}
@Override
public void onFailure(Throwable t, Response response) {
LOGGER.error("Error initializing MongoDB database");
LOGGER.error(response.message());
latch.countDown();
}
@Override
public void onClose(int code, String reason) {
LOGGER.info("MongoDb init executor close: [" + code + "] " + reason);
latch.countDown();
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(MongoController.class);
private static final String DB_INIT_SCRIPT_PATH_CONTAINER = "/usr/local/bin/init-inventory.sh";
public MongoController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp) {
super(deployment, services, dbType, ocp);
}
@Override
protected String constructDatabaseUrl(String hostname, int port) {
return "mongodb://" + hostname + ":" + port;
}
public void initialize() throws InterruptedException {
Pod pod = ocp.pods().inNamespace(project).withLabel("deployment", name).list().getItems().get(0);
CountDownLatch latch = new CountDownLatch(1);
try (ExecWatch exec = ocp.pods().inNamespace(project).withName(pod.getMetadata().getName())
.inContainer("mongo")
.writingOutput(System.out) // CHECKSTYLE IGNORE RegexpSinglelineJava FOR NEXT 2 LINES
.writingError(System.err)
.usingListener(new MongoInitListener(latch))
.exec("bash", "-c", DB_INIT_SCRIPT_PATH_CONTAINER)) {
LOGGER.info("Waiting until database is initialized");
latch.await(1, TimeUnit.MINUTES);
}
}
public MongoDatabaseClient getDatabaseClient(String username, String password) {
return getDatabaseClient(username, password, "admin");
}
public MongoDatabaseClient getDatabaseClient(String username, String password, String authSource) {
return new MongoDatabaseClient(getDatabaseUrl(), username, password, authSource);
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases.mongodb;
import org.bson.Document;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import io.debezium.testing.openshift.tools.databases.Commands;
import io.debezium.testing.openshift.tools.databases.DatabaseClient;
/**
*
* @author Jakub Cechacek
*/
public class MongoDatabaseClient implements DatabaseClient<MongoClient, RuntimeException> {
private String url;
private String username;
private String password;
private String authSource;
public MongoDatabaseClient(String url, String username, String password, String authSource) {
this.url = url;
this.username = username;
this.password = password;
this.authSource = authSource;
}
public void execute(Commands<MongoClient, RuntimeException> commands) throws RuntimeException {
MongoCredential credential = MongoCredential.createCredential(username, authSource, password.toCharArray());
ConnectionString connString = new ConnectionString(url);
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(connString)
.credential(credential)
.retryWrites(true)
.build();
MongoClient client = MongoClients.create(settings);
commands.execute(client);
}
public void execute(String database, Commands<MongoDatabase, RuntimeException> commands) {
execute(con -> {
MongoDatabase db = con.getDatabase(database);
commands.execute(db);
});
}
public void execute(String database, String collection, Commands<MongoCollection<Document>, RuntimeException> commands) {
execute(database, db -> {
MongoCollection col = db.getCollection(collection);
commands.execute(col);
});
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases.mongodb;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.databases.DatabaseDeployer;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
/**
*
* @author Jakub Cechacek
*/
public class MongoDeployer extends DatabaseDeployer<MongoDeployer, MongoController> {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDeployer.class);
public MongoDeployer(OpenShiftClient ocp) {
super("mongo", ocp);
}
public MongoDeployer getThis() {
return this;
}
public MongoController getController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp) {
return new MongoController(deployment, services, dbType, ocp);
}
}

View File

@ -0,0 +1,167 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.mongodb;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MONGO_DBZ_DBNAME;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MONGO_DBZ_LOGIN_DBNAME;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MONGO_DBZ_USERNAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import io.debezium.testing.openshift.ConnectorTestBase;
import io.debezium.testing.openshift.resources.ConfigProperties;
import io.debezium.testing.openshift.resources.ConnectorFactories;
import io.debezium.testing.openshift.tools.databases.mongodb.MongoController;
import io.debezium.testing.openshift.tools.databases.mongodb.MongoDatabaseClient;
import io.debezium.testing.openshift.tools.databases.mongodb.MongoDeployer;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
/**
* @author Jakub Cechacek
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Tag("acceptance")
@Tag("mongo")
public class MongoConnectorIT extends ConnectorTestBase {
public static final String DB_DEPLOYMENT_PATH = "/database-resources/mongodb/deployment.yaml";
public static final String DB_SERVICE_PATH_LB = "/database-resources/mongodb/service-lb.yaml";
public static final String DB_SERVICE_PATH = "/database-resources/mongodb/service.yaml";
public static final String CONNECTOR_NAME = "inventory-connector-mongo";
private static MongoDeployer dbDeployer;
private static MongoController dbController;
private static OkHttpClient httpClient = new OkHttpClient();
private static ConnectorFactories connectorFactories = new ConnectorFactories();
private static ConnectorConfigBuilder connectorConfig;
private static String connectorName;
@BeforeAll
public static void setupDatabase() throws IOException, InterruptedException {
if (!ConfigProperties.DATABASE_MONGO_HOST.isPresent()) {
dbDeployer = new MongoDeployer(ocp)
.withProject(ConfigProperties.OCP_PROJECT_MONGO)
.withDeployment(DB_DEPLOYMENT_PATH)
.withServices(DB_SERVICE_PATH_LB, DB_SERVICE_PATH);
dbController = dbDeployer.deploy();
dbController.initialize();
}
String id = testUtils.getUniqueId();
connectorName = CONNECTOR_NAME + "-" + id;
connectorConfig = connectorFactories.mongo()
.put("mongodb.name", connectorName);
kafkaConnectController.deployConnector(connectorName, connectorConfig);
}
@AfterAll
public static void tearDownDatabase() throws IOException, InterruptedException {
kafkaConnectController.undeployConnector(connectorName);
dbController.reload();
}
private void insertCustomer(String firstName, String lastName, String email) {
MongoDatabaseClient client = dbController.getDatabaseClient(
DATABASE_MONGO_DBZ_USERNAME, DATABASE_MONGO_DBZ_PASSWORD, DATABASE_MONGO_DBZ_LOGIN_DBNAME);
client.execute(DATABASE_MONGO_DBZ_DBNAME, "customers", col -> {
Document doc = new Document()
.append("first_name", firstName)
.append("last_name", lastName)
.append("email", email);
col.insertOne(doc);
});
}
@Test
@Order(1)
public void shouldHaveRegisteredConnector() {
Request r = new Request.Builder()
.url(kafkaConnectController.getApiURL().resolve("/connectors"))
.build();
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
try (Response res = httpClient.newCall(r).execute()) {
assertThat(res.body().string()).contains(connectorName);
}
});
}
@Test
@Order(2)
public void shouldCreateKafkaTopics() {
assertTopicsExist(
connectorName + ".inventory.customers",
connectorName + ".inventory.orders",
connectorName + ".inventory.products");
}
@Test
@Order(3)
public void shouldContainRecordsInCustomersTopic() throws IOException {
kafkaConnectController.waitForMongoSnapshot(connectorName);
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 4));
}
@Test
@Order(4)
public void shouldStreamChanges() {
insertCustomer("Tom", "Tester", "tom@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com"));
}
@Test
@Order(5)
public void shouldBeDown() throws IOException {
kafkaConnectController.undeployConnector(connectorName);
insertCustomer("Jerry", "Tester", "jerry@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5));
}
@Test
@Order(6)
public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException {
kafkaConnectController.deployConnector(connectorName, connectorConfig);
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com"));
}
@Test
@Order(7)
public void shouldBeDownAfterCrash() {
operatorController.disable();
kafkaConnectController.destroy();
insertCustomer("Nibbles", "Tester", "nibbles@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
}
@Test
@Order(8)
public void shouldResumeStreamingAfterCrash() throws InterruptedException {
operatorController.enable();
kafkaConnectController.waitForConnectCluster();
awaitAssert(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com"));
}
}

View File

@ -0,0 +1,53 @@
kind: Deployment
apiVersion: apps/v1
metadata:
name: mongo
labels:
app: mongo
spec:
replicas: 1
selector:
matchLabels:
app: mongo
deployment: mongo
template:
metadata:
labels:
app: mongo
deployment: mongo
spec:
volumes:
- name: mongo-volume-1
emptyDir: {}
- name: mongo-volume-2
emptyDir: {}
containers:
- resources: {}
name: mongo
ports:
- containerPort: 27017
protocol: TCP
imagePullPolicy: Always
volumeMounts:
- name: mongo-volume-1
mountPath: /data/configdb
- name: mongo-volume-2
mountPath: /data/db
livenessProbe:
initialDelaySeconds: 30
tcpSocket:
port: 27017
timeoutSeconds: 1
readinessProbe:
initialDelaySeconds: 30
tcpSocket:
port: 27017
timeoutSeconds: 1
terminationMessagePolicy: File
terminationMessagePath: /dev/termination-log
image: 'quay.io/debezium/example-mongodb-ocp:latest'
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
strategy:
type: Recreate

View File

@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: mongo-lb
spec:
selector:
app: mongo
deployment: mongo
type: LoadBalancer
ports:
- name: db
port: 27017
targetPort: 27017

View File

@ -0,0 +1,12 @@
apiVersion: v1
kind: Service
metadata:
name: mongo
spec:
selector:
app: mongo
deployment: mongo
ports:
- name: db
port: 27017
targetPort: 27017