From b0f663c4c6c68ae5f185225cbda112db530b2351 Mon Sep 17 00:00:00 2001 From: Martin Medek Date: Mon, 8 Apr 2024 15:18:27 +0200 Subject: [PATCH] DBZ-7605 added ssl option for mongo tests --- .../system/tools/ConfigProperties.java | 1 - .../mongodb/sharded/OcpMongoReplicaSet.java | 16 +++++----- .../sharded/OcpMongoShardedCluster.java | 27 ++++++++-------- .../sharded/OcpMongoShardedConstants.java | 2 +- .../sharded/OcpMongoShardedDeployer.java | 2 +- .../tools/kafka/ConnectorConfigBuilder.java | 22 +++++++++++++ .../connectors/ShardedMongoConnector.java | 1 - .../ShardedReplicaMongoConnector.java | 8 ++++- .../system/resources/ConnectorFactories.java | 31 +++++++++++-------- .../mongodb/sharded/ShardedMongoTests.java | 5 ++- 10 files changed, 73 insertions(+), 42 deletions(-) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/ConfigProperties.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/ConfigProperties.java index 83a483043..edf4cbf40 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/ConfigProperties.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/ConfigProperties.java @@ -104,7 +104,6 @@ private ConfigProperties() { public static final boolean DATABASE_MONGO_USE_KEYFILE = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.keyfile")); public static final boolean DATABASE_MONGO_USE_TLS = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.tls")); - public static final String DATABASE_MONGO_DOCKER_DESKTOP_PORTS = System.getProperty("database.mongo.docker.desktop.ports", "27017:27117"); public static final int DATABASE_MONGO_DOCKER_REPLICA_SIZE = Integer.parseInt(System.getProperty("database.mongo.docker.replica.size", "1")); // DB2 Configuration diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoReplicaSet.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoReplicaSet.java index f2de4227c..2881dd774 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoReplicaSet.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoReplicaSet.java @@ -49,16 +49,16 @@ public class OcpMongoReplicaSet implements Startable { private final OpenShiftUtils ocpUtil; private final String project; private final boolean useKeyfile; - private final boolean useTsl; + private final boolean useTls; private final int shardNum; private final List members; public OcpMongoReplicaSet(String name, boolean configServer, int memberCount, String rootUserName, String rootPassword, OpenShiftClient ocp, String project, - boolean useKeyfile, boolean useTsl, int shardNum) { + boolean useKeyfile, boolean useTls, int shardNum) { this.name = name; this.configServer = configServer; this.memberCount = memberCount; - this.useTsl = useTsl; + this.useTls = useTls; this.authRequired = false; this.rootUserName = rootUserName; this.rootPassword = rootPassword; @@ -111,7 +111,7 @@ public void start() { if (useKeyfile) { members.forEach(m -> MongoShardedUtil.addKeyFileToDeployment(m.getDeployment())); } - if (ConfigProperties.DATABASE_MONGO_USE_TLS) { + if (useTls) { members.forEach(m -> MongoShardedUtil.addCertificatesToDeployment(m.getDeployment())); } @@ -229,7 +229,7 @@ public static final class OcpMongoReplicaSetBuilder { private String project; private boolean useKeyfile; private int shardNum; - private boolean useTsl; + private boolean useTls; private OcpMongoReplicaSetBuilder() { } @@ -274,8 +274,8 @@ public OcpMongoReplicaSetBuilder withUseKeyfile(boolean useKeyfile) { return this; } - public OcpMongoReplicaSetBuilder withUseTsl(boolean useTsl) { - this.useTsl = useTsl; + public OcpMongoReplicaSetBuilder withUseTls(boolean useTls) { + this.useTls = useTls; return this; } @@ -285,7 +285,7 @@ public OcpMongoReplicaSetBuilder withShardNum(int shardNum) { } public OcpMongoReplicaSet build() { - return new OcpMongoReplicaSet(name, configServer, memberCount, rootUserName, rootPassword, ocp, project, useKeyfile, useTsl , shardNum); + return new OcpMongoReplicaSet(name, configServer, memberCount, rootUserName, rootPassword, ocp, project, useKeyfile, useTls, shardNum); } } } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedCluster.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedCluster.java index c2d0e7b76..f2cbe107f 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedCluster.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedCluster.java @@ -42,7 +42,7 @@ public class OcpMongoShardedCluster implements Startable { private final String rootUserName; private final String rootPassword; private final boolean useInternalAuth; - private final boolean useTsl; + private final boolean useTls; private final OpenShiftClient ocp; private final OpenShiftUtils ocpUtils; private final int initialShardCount; @@ -63,7 +63,7 @@ public void start() { return; } - if (useTsl && useInternalAuth) { + if (useTls && useInternalAuth) { throw new IllegalStateException("Cannot deploy mongo with both tls and keyfile internal auth"); } @@ -130,10 +130,11 @@ public void removeShard() { /** * deploy new shard and initialize it. Requires running initialized sharded mongo cluster */ - public void addShard(@Nullable Map rangeMap) { + public OcpMongoReplicaSet addShard(@Nullable Map rangeMap) { int shardNum = shardReplicaSets.size(); var rs = deployNewShard(shardNum); registerShardInMongos(rangeMap, rs); + return rs; } /** @@ -193,7 +194,7 @@ private OcpMongoReplicaSet deployNewShard(int shardNum) { .withRootPassword(rootPassword) .withMemberCount(replicaCount) .withUseKeyfile(useInternalAuth) - .withUseTsl(useTsl) + .withUseTls(useTls) .withOcp(ocp) .withProject(project) .build(); @@ -222,7 +223,7 @@ private void deployConfigServers() { .withRootPassword(rootPassword) .withMemberCount(configServerCount) .withUseKeyfile(useInternalAuth) - .withUseTsl(useTsl) + .withUseTls(useTls) .withOcp(ocp) .withProject(project) .build(); @@ -237,7 +238,7 @@ private void deployMongos() { MongoShardedUtil.addKeyFileToDeployment(mongosRouter.getDeployment()); } - if (useTsl) { + if (useTls) { MongoShardedUtil.addCertificatesToDeployment(mongosRouter.getDeployment()); } @@ -286,14 +287,14 @@ private String createKeyRangeCommand(ShardKeyRange range, MongoShardKey key) { } public OcpMongoShardedCluster(int initialShardCount, int replicaCount, int configServerCount, @Nullable String rootUserName, @Nullable String rootPassword, - boolean useInternalAuth, boolean useTsl, OpenShiftClient ocp, String project, List shardKeys) { + boolean useInternalAuth, boolean useTls, OpenShiftClient ocp, String project, List shardKeys) { this.initialShardCount = initialShardCount; this.replicaCount = replicaCount; this.configServerCount = configServerCount; this.rootUserName = StringUtils.isNotEmpty(rootUserName) ? rootUserName : ConfigProperties.DATABASE_MONGO_USERNAME; this.rootPassword = StringUtils.isNotEmpty(rootPassword) ? rootPassword : ConfigProperties.DATABASE_MONGO_SA_PASSWORD; this.useInternalAuth = useInternalAuth; - this.useTsl = useTsl; + this.useTls = useTls; this.ocp = ocp; this.project = project; this.ocpUtils = new OpenShiftUtils(ocp); @@ -305,7 +306,7 @@ public static OcpMongoShardedClusterBuilder builder() { } public boolean getUseTls() { - return useTsl; + return useTls; } public static final class OcpMongoShardedClusterBuilder { @@ -318,7 +319,7 @@ public static final class OcpMongoShardedClusterBuilder { private int initialShardCount; private String project; private List shardKeys; - private boolean useTsl; + private boolean useTls; private OcpMongoShardedClusterBuilder() { } @@ -344,8 +345,8 @@ public OcpMongoShardedClusterBuilder withUseInternalAuth(boolean useInternalAuth return this; } - public OcpMongoShardedClusterBuilder withUseTsl(boolean useTsl) { - this.useTsl = useTsl; + public OcpMongoShardedClusterBuilder withUseTls(boolean useTls) { + this.useTls = useTls; return this; } @@ -370,7 +371,7 @@ public OcpMongoShardedClusterBuilder withShardKeys(List shardKeys } public OcpMongoShardedCluster build() { - return new OcpMongoShardedCluster(initialShardCount, replicaCount, configServerCount, rootUserName, rootPassword, useInternalAuth, useTsl, ocp, project, + return new OcpMongoShardedCluster(initialShardCount, replicaCount, configServerCount, rootUserName, rootPassword, useInternalAuth, useTls, ocp, project, shardKeys); } } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedConstants.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedConstants.java index 519222ee7..80fc3fca6 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedConstants.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedConstants.java @@ -24,7 +24,7 @@ public class OcpMongoShardedConstants { public final static String INIT_RS_TEMPLATE = "init-rs.js"; public final static String CREATE_CERT_USER_TEMPLATE = "create-dbz-user-x509.js"; - public final static String CREATE_DBZ_USER_TEMPLATE = "create-dbz-user-x509.js"; + public final static String CREATE_DBZ_USER_TEMPLATE = "create-dbz-user.js"; public final static String INSERT_MONGOS_DATA_SCRIPT_LOC = "/database-resources/mongodb/sharded/insert-mongos-data.js"; public final static String KEYFILE_PATH_IN_CONTAINER = "/etc/mongodb.keyfile"; } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedDeployer.java index a392a0da2..29d7ddddc 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/sharded/OcpMongoShardedDeployer.java @@ -58,7 +58,7 @@ public OcpMongoShardedController deploy() throws Exception { .withReplicaCount(replicaCount) .withShardKeys(shardKeys) .withUseInternalAuth(useKeyfile) - .withUseTsl(useTls) + .withUseTls(useTls) .withRootUser(rootUserName, rootPassword) .withShardKeys(shardKeys) .build(); diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/ConnectorConfigBuilder.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/ConnectorConfigBuilder.java index 0604dcc89..e58473a2e 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/ConnectorConfigBuilder.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/ConnectorConfigBuilder.java @@ -11,6 +11,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.certificateutil.CertUtil; +import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoCertGenerator; import io.strimzi.api.kafka.model.connector.KafkaConnector; import io.strimzi.api.kafka.model.connector.KafkaConnectorBuilder; @@ -119,6 +122,25 @@ public ConnectorConfigBuilder addOperationRouterForTable(String op, String table return addOperationRouter(op, targetTopicName, serverName + ".*\\." + tableName); } + public ConnectorConfigBuilder addMongoTlsParams() { + this + .put("mongodb.ssl.enabled", true) + .put("mongodb.ssl.keystore", + "/opt/kafka/external-configuration/" + OcpMongoCertGenerator.KEYSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.KEYSTORE_SUBPATH) + .put("mongodb.ssl.keystore.password", CertUtil.KEYSTORE_PASSWORD) + .put("mongodb.ssl.truststore", + "/opt/kafka/external-configuration/" + OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.TRUSTSTORE_SUBPATH) + .put("mongodb.ssl.truststore.password", CertUtil.KEYSTORE_PASSWORD); + return this; + } + + public ConnectorConfigBuilder addMongoPasswordAuthParams() { + this + .put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME) + .put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD); + return this; + } + /** * Get configuration as JSON string * @return JSON string of connector config diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedMongoConnector.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedMongoConnector.java index f5ba480d8..92aa6655f 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedMongoConnector.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedMongoConnector.java @@ -31,7 +31,6 @@ public ConnectorConfigBuilder connectorConfig(String connectorName) { } else { return new ConnectorFactories(kafkaController).shardedMongo(dbController, connectorName); - } } } diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedReplicaMongoConnector.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedReplicaMongoConnector.java index 364439f7d..d66161b7c 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedReplicaMongoConnector.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/ShardedReplicaMongoConnector.java @@ -8,6 +8,7 @@ import org.junit.jupiter.api.extension.ExtensionContext; import io.debezium.testing.system.resources.ConnectorFactories; +import io.debezium.testing.system.tools.ConfigProperties; import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController; import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; import io.debezium.testing.system.tools.kafka.KafkaConnectController; @@ -25,6 +26,11 @@ public ShardedReplicaMongoConnector(ExtensionContext.Store store) { @Override public ConnectorConfigBuilder connectorConfig(String connectorName) { - return new ConnectorFactories(kafkaController).shardedReplicaMongo(dbController, connectorName); + if (ConfigProperties.DATABASE_MONGO_USE_TLS) { + return new ConnectorFactories(kafkaController).shardedReplicaMongoWithTls(dbController, connectorName); + } + else { + return new ConnectorFactories(kafkaController).shardedReplicaMongo(dbController, connectorName); + } } } diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java index 1118005f2..97f37c87a 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java @@ -8,10 +8,8 @@ import java.util.Random; import io.debezium.testing.system.tools.ConfigProperties; -import io.debezium.testing.system.tools.certificateutil.CertUtil; import io.debezium.testing.system.tools.databases.SqlDatabaseController; import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController; -import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoCertGenerator; import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; import io.debezium.testing.system.tools.kafka.KafkaController; @@ -108,8 +106,7 @@ public ConnectorConfigBuilder shardedMongo(MongoDatabaseController controller, S .put("task.max", 1) .put("mongodb.connection.string", controller.getPublicDatabaseUrl()) .put("mongodb.connection.mode", "sharded") - .put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME) - .put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD) + .addMongoPasswordAuthParams() .addOperationRouterForTable("u", "customers"); return cb; } @@ -122,13 +119,7 @@ public ConnectorConfigBuilder shardedMongoWithTls(MongoDatabaseController contro .put("task.max", 1) .put("mongodb.connection.string", controller.getPublicDatabaseUrl()) .put("mongodb.connection.mode", "sharded") - .put("mongodb.ssl.enabled", true) - .put("mongodb.ssl.keystore", - "/opt/kafka/external-configuration/" + OcpMongoCertGenerator.KEYSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.KEYSTORE_SUBPATH) - .put("mongodb.ssl.keystore.password", CertUtil.KEYSTORE_PASSWORD) - .put("mongodb.ssl.truststore", - "/opt/kafka/external-configuration/" + OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.TRUSTSTORE_SUBPATH) - .put("mongodb.ssl.truststore.password", CertUtil.KEYSTORE_PASSWORD) + .addMongoTlsParams() .addOperationRouterForTable("u", "customers"); return cb; } @@ -141,10 +132,24 @@ public ConnectorConfigBuilder shardedReplicaMongo(MongoDatabaseController contro .put("topic.prefix", connectorName) .put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector") .put("task.max", 4) - .put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME) - .put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD) .put("mongodb.connection.string", controller.getPublicDatabaseUrl()) .put("mongodb.connection.mode", "replica_set") + .addMongoPasswordAuthParams() + .addOperationRouterForTable("u", "customers"); + return cb; + } + + public ConnectorConfigBuilder shardedReplicaMongoWithTls(MongoDatabaseController controller, String connectorName) { + + // String connectionUrl =; + ConnectorConfigBuilder cb = new ConnectorConfigBuilder(connectorName); + cb + .put("topic.prefix", connectorName) + .put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector") + .put("task.max", 4) + .put("mongodb.connection.string", controller.getPublicDatabaseUrl()) + .put("mongodb.connection.mode", "replica_set") + .addMongoTlsParams() .addOperationRouterForTable("u", "customers"); return cb; } diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/mongodb/sharded/ShardedMongoTests.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/mongodb/sharded/ShardedMongoTests.java index b977de93f..917f71831 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/mongodb/sharded/ShardedMongoTests.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/mongodb/sharded/ShardedMongoTests.java @@ -92,9 +92,8 @@ protected void addAndRemoveShardTest(OcpMongoShardedController dbController, Str // add shard, restart connector, insert to that shard and verify that insert was captured by debezium var key = dbController.getMongo().getShardKey("inventory.customers"); var keyRange = new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(3), "1100", "1105"); - dbController.getMongo().addShard(Map.of(key, keyRange)); - var sets = dbController.getMongo().getShardReplicaSets(); - sets.get(sets.size() - 1).executeMongosh(dbController.createDbzUserCommand(), true); + var newShard = dbController.getMongo().addShard(Map.of(key, keyRange)); + newShard.executeMongosh(dbController.createDbzUserCommand(), false); connectController.undeployConnector(connectorName); connectController.deployConnector(connectorConfig);