DBZ-7221 sharded mongo tests - fixed Freemarker Configuration creation, removed redundant methods, refactor

This commit is contained in:
Martin Medek 2024-02-20 08:55:39 +01:00 committed by Ondrej Babec
parent ec90242567
commit 28105c108f
16 changed files with 92 additions and 111 deletions

View File

@ -11,8 +11,6 @@
import static org.awaitility.Awaitility.await;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@ -31,7 +29,6 @@
import org.slf4j.LoggerFactory;
import io.debezium.testing.system.tools.databases.DatabaseExecListener;
import io.debezium.testing.system.tools.databases.DatabaseInitListener;
import io.debezium.testing.system.tools.operatorutil.OpenshiftOperatorEnum;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.EnvVar;
@ -50,9 +47,7 @@
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyPort;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.TtyExecErrorChannelable;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.api.model.RouteBuilder;
import io.fabric8.openshift.api.model.operatorhub.v1.OperatorGroup;
@ -346,19 +341,6 @@ public static OpenShiftClient createOcpClient() {
return new DefaultOpenShiftClient(configBuilder.build());
}
public TtyExecErrorChannelable<String, OutputStream, PipedInputStream, ExecWatch> prepareExec(Deployment deployment, String project, PrintStream commandOutput,
PrintStream errorOutput) {
var pods = podsForDeployment(deployment);
if (pods.size() > 1) {
throw new IllegalArgumentException("Executing command on deployment scaled to more than 1");
}
Pod pod = pods.get(0);
return getPodResource(pod, project)
.inContainer(pod.getMetadata().getLabels().get("app"))
.writingOutput(commandOutput)
.writingError(errorOutput);
}
private PodResource<Pod> getPodResource(Pod pod, String project) {
return client.pods().inNamespace(project).withName(pod.getMetadata().getName());
}
@ -370,7 +352,15 @@ public CommandOutputs executeCommand(Deployment deployment, String project, bool
PrintStream pse = new PrintStream(captureErr);
CountDownLatch latch = new CountDownLatch(1);
try (var ignored = prepareExec(deployment, project, pso, pse)
var pods = podsForDeployment(deployment);
if (pods.size() > 1) {
throw new IllegalArgumentException("Executing command on deployment scaled to more than 1");
}
Pod pod = pods.get(0);
try (var ignored = getPodResource(pod, project)
.inContainer(pod.getMetadata().getLabels().get("app"))
.writingOutput(pso)
.writingError(pse)
.usingListener(new DatabaseExecListener(deployment.getMetadata().getName(), latch))
.exec(commands)) {
if (debugLogs) {
@ -404,15 +394,4 @@ public String toString() {
'}';
}
}
public void executeInitCommand(Deployment deployment, String project, String... commands) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
String containerName = deployment.getMetadata().getLabels().get("app");
try (var ignored = prepareExec(deployment, project, null, null)
.usingListener(new DatabaseInitListener(containerName, latch))
.exec(commands)) {
LOGGER.info("Waiting until database is initialized");
latch.await(scaled(1), MINUTES);
}
}
}

View File

@ -5,17 +5,12 @@
*/
package io.debezium.testing.system.tools.databases;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.debezium.testing.system.tools.WaitConditions;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
@ -81,22 +76,6 @@ public int getPublicDatabasePort() {
return getDatabasePort();
}
protected void executeInitCommand(Deployment deployment, String... commands) throws InterruptedException {
ByteArrayOutputStream captureOut = new ByteArrayOutputStream();
ByteArrayOutputStream captureErr = new ByteArrayOutputStream();
PrintStream pso = new PrintStream(captureOut);
PrintStream pse = new PrintStream(captureErr);
CountDownLatch latch = new CountDownLatch(1);
String containerName = deployment.getMetadata().getLabels().get("app");
try (var ignored = ocpUtils.prepareExec(deployment, project, pso, pse)
.usingListener(new DatabaseInitListener(containerName, latch))
.exec(commands)) {
LOGGER.info("Waiting until database is initialized");
latch.await(WaitConditions.scaled(1), TimeUnit.MINUTES);
}
}
private int getOriginalDatabasePort() {
return getService().getSpec().getPorts().stream()
.filter(p -> p.getName().equals("db"))

View File

@ -5,7 +5,6 @@
*/
package io.debezium.testing.system.tools.databases.mongodb.sharded;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
@ -13,29 +12,15 @@
import java.util.stream.IntStream;
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelFactory;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelProvider;
import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarkermodels.CreateUserModel;
import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarkermodels.FreemarkerConfiguration;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateException;
public class MongoShardedUtil {
private static Configuration configuration;
public static Configuration getFreemarkerConfiguration() {
if (configuration == null) {
configuration = new Configuration(Configuration.VERSION_2_3_32);
try {
configuration.setDirectoryForTemplateLoading(new File("src/test/resources/database-resources/mongodb/sharded/command-templates"));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
return configuration;
}
public static List<Integer> intRange(int count) {
return IntStream.rangeClosed(0, count - 1).boxed().collect(Collectors.toList());
@ -55,31 +40,17 @@ public static OpenShiftUtils.CommandOutputs executeMongoShOnPod(OpenShiftUtils o
}
}
public static OpenShiftUtils.CommandOutputs executeMongoShOnPod(OpenShiftUtils ocpUtils, String project, Deployment deployment, int port, String command,
boolean debugLogs) {
try {
return ocpUtils.executeCommand(deployment, project, debugLogs,
"mongosh",
String.format("localhost:%d/admin", port),
"--eval",
command);
}
catch (InterruptedException e) {
throw new RuntimeException("Failed executing mongosh command", e);
}
}
public static String createDebeziumUserCommand(String userName, String password) throws IOException, TemplateException {
var writer = new StringWriter();
Template template = getFreemarkerConfiguration().getTemplate(OcpMongoShardedConstants.CREATE_DBZ_USER_TEMPLATE);
Template template = new FreemarkerConfiguration().getFreemarkerConfiguration().getTemplate(OcpMongoShardedConstants.CREATE_DBZ_USER_TEMPLATE);
template.process(new CreateUserModel(userName, password), writer);
return writer.toString();
}
public static List<MongoShardKey> getTestShardKeys() {
MongoShardKey customersKey = new MongoShardKey("inventory.customers", "_id", MongoShardKey.ShardingType.NORMAL);
customersKey.getKeyRanges().add(new ShardKeyRange(OcpShardModelFactory.getShardReplicaSetName(1), "1000", "1003"));
customersKey.getKeyRanges().add(new ShardKeyRange(OcpShardModelFactory.getShardReplicaSetName(2), "1003", "1004"));
customersKey.getKeyRanges().add(new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(1), "1000", "1003"));
customersKey.getKeyRanges().add(new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(2), "1003", "1004"));
MongoShardKey productsKey = new MongoShardKey("inventory.products", "_id", MongoShardKey.ShardingType.HASHED);
return List.of(customersKey, productsKey);

View File

@ -26,8 +26,8 @@
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpMongosModelFactory;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelFactory;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpMongosModelProvider;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelProvider;
import io.fabric8.openshift.client.OpenShiftClient;
import freemarker.template.TemplateException;
@ -158,7 +158,7 @@ private OcpMongoShardedReplicaSet deployNewShard(int shardNum) {
LOGGER.info("Deploying shard number " + shardNum);
OcpMongoShardedReplicaSet replicaSet = OcpMongoShardedReplicaSet.builder()
.withShardNum(shardNum)
.withName(OcpShardModelFactory.getShardReplicaSetName(shardNum))
.withName(OcpShardModelProvider.getShardReplicaSetName(shardNum))
.withConfigServer(false)
.withRootUserName(rootUserName)
.withRootPassword(rootPassword)
@ -200,8 +200,8 @@ private void deployConfigServers() {
}
private void deployMongos() {
mongosRouter = new OcpMongoShardedNode(OcpMongosModelFactory.mongosDeployment(configServerReplicaSet.getReplicaSetFullName()),
OcpMongosModelFactory.mongosService(), null, ocp, project);
mongosRouter = new OcpMongoShardedNode(OcpMongosModelProvider.mongosDeployment(configServerReplicaSet.getReplicaSetFullName()),
OcpMongosModelProvider.mongosService(), null, ocp, project);
if (useInternalAuth) {
MongoShardedUtil.addKeyFileToDeployment(mongosRouter.getDeployment());
}

View File

@ -18,7 +18,7 @@
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseClient;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpMongosModelFactory;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpMongosModelProvider;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.openshift.client.OpenShiftClient;
@ -124,7 +124,7 @@ private Service getService() {
return ocp
.services()
.inNamespace(project)
.withName(OcpMongosModelFactory.DEPLOYMENT_NAME)
.withName(OcpMongosModelProvider.DEPLOYMENT_NAME)
.get();
}

View File

@ -7,7 +7,6 @@
import static io.debezium.testing.system.tools.databases.mongodb.sharded.MongoShardedUtil.createRootUserCommand;
import static io.debezium.testing.system.tools.databases.mongodb.sharded.MongoShardedUtil.executeMongoShOnPod;
import static io.debezium.testing.system.tools.databases.mongodb.sharded.MongoShardedUtil.getFreemarkerConfiguration;
import static io.debezium.testing.system.tools.databases.mongodb.sharded.MongoShardedUtil.intRange;
import java.io.IOException;
@ -24,8 +23,9 @@
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpConfigServerModelFactory;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelFactory;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpConfigServerModelProvider;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelProvider;
import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarkermodels.FreemarkerConfiguration;
import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarkermodels.InitReplicaSetModel;
import io.fabric8.openshift.client.OpenShiftClient;
@ -70,11 +70,28 @@ public OcpMongoShardedReplicaSet(String name, boolean configServer, int memberCo
this.members = intRange(memberCount)
.stream()
.map(i -> configServer
? new OcpShardedMongoReplica(OcpConfigServerModelFactory.configServerDeployment(i), OcpConfigServerModelFactory.configServerService(i),
getConfigServerServiceName(i), ocp, project, i)
: new OcpShardedMongoReplica(OcpShardModelFactory.shardDeployment(shardNum, i), OcpShardModelFactory.shardService(shardNum, i),
getShardReplicaServiceName(i), ocp, project, i))
.map(i -> {
if (configServer) {
return OcpShardedMongoReplica.builder()
.withDeployment(OcpConfigServerModelProvider.configServerDeployment(i))
.withService(OcpConfigServerModelProvider.configServerService(i))
.withServiceUrl(getConfigServerServiceName(i))
.withOcp(ocp)
.withProject(project)
.withReplicaNum(i)
.build();
}
else {
return OcpShardedMongoReplica.builder()
.withDeployment(OcpShardModelProvider.shardDeployment(shardNum, i))
.withService(OcpShardModelProvider.shardService(shardNum, i))
.withServiceUrl(getShardReplicaServiceName(i))
.withOcp(ocp)
.withProject(project)
.withReplicaNum(i)
.build();
}
})
.collect(Collectors.toList());
}
@ -166,7 +183,7 @@ public OpenShiftUtils.CommandOutputs executeMongosh(String command, boolean debu
private String getInitRsCommand() throws IOException, TemplateException {
var writer = new StringWriter();
Template template = getFreemarkerConfiguration().getTemplate(OcpMongoShardedConstants.INIT_RS_TEMPLATE);
Template template = new FreemarkerConfiguration().getFreemarkerConfiguration().getTemplate(OcpMongoShardedConstants.INIT_RS_TEMPLATE);
template.process(new InitReplicaSetModel(members, name, configServer), writer);
return writer.toString();
}
@ -177,7 +194,7 @@ private String getShardReplicaServiceName(int replicaNum) {
}
private String getConfigServerServiceName(int replicaNum) {
return String.format("%s.%s.svc.cluster.local:%d", OcpConfigServerModelFactory.getConfigServerName(replicaNum), ConfigProperties.OCP_PROJECT_MONGO,
return String.format("%s.%s.svc.cluster.local:%d", OcpConfigServerModelProvider.getConfigServerName(replicaNum), ConfigProperties.OCP_PROJECT_MONGO,
OcpMongoShardedConstants.MONGO_CONFIG_PORT);
}

View File

@ -9,12 +9,14 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
import lombok.Builder;
import lombok.Getter;
@Getter
public class OcpShardedMongoReplica extends OcpMongoShardedNode {
private final int replicaNum;
@Builder(setterPrefix = "with")
public OcpShardedMongoReplica(Deployment deployment, Service service, String serviceUrl, OpenShiftClient ocp, String project, int replicaNum) {
super(deployment, service, serviceUrl, ocp, project);
this.replicaNum = replicaNum;

View File

@ -30,7 +30,7 @@
import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentStrategyBuilder;
public class OcpConfigServerModelFactory {
public class OcpConfigServerModelProvider {
public static String getConfigServerName(int num) {
return OcpMongoShardedConstants.MONGO_CONFIG_DEPLOYMENT_NAME + num;

View File

@ -28,7 +28,7 @@
import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentStrategyBuilder;
public class OcpMongosModelFactory {
public class OcpMongosModelProvider {
public static final String DEPLOYMENT_NAME = "mongo-mongos";
public static Deployment mongosDeployment(String configServersRs) {

View File

@ -31,7 +31,7 @@
import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentStrategyBuilder;
public class OcpShardModelFactory {
public class OcpShardModelProvider {
public static Deployment shardDeployment(int shardNum, int replicaNum) {
String name = getShardNodeName(shardNum, replicaNum);

View File

@ -0,0 +1,31 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.databases.mongodb.sharded.freemarkermodels;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import freemarker.template.Configuration;
public class FreemarkerConfiguration {
private static Configuration configuration;
public Configuration getFreemarkerConfiguration() {
if (configuration == null) {
configuration = new Configuration(Configuration.VERSION_2_3_32);
try {
Path templatePath = Paths.get(getClass().getResource("/database-resources/mongodb/sharded/command-templates").toURI());
configuration.setDirectoryForTemplateLoading(templatePath.toFile());
}
catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}
return configuration;
}
}

View File

@ -48,7 +48,7 @@ public void initialize() throws InterruptedException {
.file(DB_INIT_SCRIPT_PATH_CONTAINER)
.upload(initScript);
executeInitCommand(deployment, "sqlplus", "-S",
ocpUtils.executeCommand(deployment, project, true, "sqlplus", "-S",
DATABASE_ORACLE_USERNAME + "/" + DATABASE_ORACLE_PASSWORD + "@//localhost:1521/ORCLPDB1", "@" + DB_INIT_SCRIPT_PATH_CONTAINER);
}

View File

@ -48,6 +48,7 @@ public void initialize() throws InterruptedException {
ocp.pods().inNamespace(project).withName(pod.getMetadata().getName())
.file(INIT_SCRIPT_PATH_CONTAINER)
.upload(initScript);
executeInitCommand(deployment, "psql", "-U", DATABASE_POSTGRESQL_USERNAME, "-d", DATABASE_POSTGRESQL_DBZ_DBNAME, "-f", INIT_SCRIPT_PATH_CONTAINER);
ocpUtils.executeCommand(deployment, project, true, "psql", "-U", DATABASE_POSTGRESQL_USERNAME, "-d", DATABASE_POSTGRESQL_DBZ_DBNAME, "-f",
INIT_SCRIPT_PATH_CONTAINER);
}
}

View File

@ -54,6 +54,7 @@ public void initialize() throws InterruptedException {
ocp.pods().inNamespace(project).withName(pod.getMetadata().getName())
.file(DB_INIT_SCRIPT_PATH_CONTAINER)
.upload(initScript);
executeInitCommand(deployment, "/opt/mssql-tools/bin/sqlcmd", "-U", "sa", "-P", DATABASE_SQLSERVER_SA_PASSWORD, "-i", DB_INIT_SCRIPT_PATH_CONTAINER);
ocpUtils.executeCommand(deployment, project, true, "/opt/mssql-tools/bin/sqlcmd", "-U", "sa", "-P", DATABASE_SQLSERVER_SA_PASSWORD, "-i",
DB_INIT_SCRIPT_PATH_CONTAINER);
}
}

View File

@ -26,7 +26,7 @@
import io.debezium.testing.system.tools.databases.mongodb.sharded.MongoShardedUtil;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController;
import io.debezium.testing.system.tools.databases.mongodb.sharded.ShardKeyRange;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelFactory;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentfactories.OcpShardModelProvider;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
@ -93,7 +93,7 @@ protected void addAndRemoveShardTest(OcpMongoShardedController dbController, Str
// add shard, restart connector, insert to that shard and verify that insert was captured by debezium
var key = dbController.getMongo().getShardKey("inventory.customers");
var keyRange = new ShardKeyRange(OcpShardModelFactory.getShardReplicaSetName(3), "1100", "1105");
var keyRange = new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(3), "1100", "1105");
dbController.addShard(Map.of(key, keyRange));
var sets = dbController.getMongo().getShardReplicaSets();
sets.get(sets.size() - 1).executeMongosh(

View File

@ -3,7 +3,7 @@ rs.initiate({ _id: "${rsId}", configsvr: ${configServer?c}, members: [${members}
let isPrimary = false;
let count = 0;
// wait for primary node election
while(isPrimary == false && count < 60) {
while(isPrimary == false && count < 120) {
const rplStatus = db.adminCommand({ replSetGetStatus : 1 });
isPrimary = rplStatus.members[0].stateStr === "PRIMARY";
print("is primary result: ", isPrimary);