DBZ-7605 added ssl option for mongo tests
This commit is contained in:
parent
56eef4eef9
commit
b0f663c4c6
@ -104,7 +104,6 @@ private ConfigProperties() {
|
|||||||
public static final boolean DATABASE_MONGO_USE_KEYFILE = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.keyfile"));
|
public static final boolean DATABASE_MONGO_USE_KEYFILE = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.keyfile"));
|
||||||
public static final boolean DATABASE_MONGO_USE_TLS = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.tls"));
|
public static final boolean DATABASE_MONGO_USE_TLS = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.tls"));
|
||||||
|
|
||||||
|
|
||||||
public static final String DATABASE_MONGO_DOCKER_DESKTOP_PORTS = System.getProperty("database.mongo.docker.desktop.ports", "27017:27117");
|
public static final String DATABASE_MONGO_DOCKER_DESKTOP_PORTS = System.getProperty("database.mongo.docker.desktop.ports", "27017:27117");
|
||||||
public static final int DATABASE_MONGO_DOCKER_REPLICA_SIZE = Integer.parseInt(System.getProperty("database.mongo.docker.replica.size", "1"));
|
public static final int DATABASE_MONGO_DOCKER_REPLICA_SIZE = Integer.parseInt(System.getProperty("database.mongo.docker.replica.size", "1"));
|
||||||
// DB2 Configuration
|
// DB2 Configuration
|
||||||
|
@ -49,16 +49,16 @@ public class OcpMongoReplicaSet implements Startable {
|
|||||||
private final OpenShiftUtils ocpUtil;
|
private final OpenShiftUtils ocpUtil;
|
||||||
private final String project;
|
private final String project;
|
||||||
private final boolean useKeyfile;
|
private final boolean useKeyfile;
|
||||||
private final boolean useTsl;
|
private final boolean useTls;
|
||||||
private final int shardNum;
|
private final int shardNum;
|
||||||
private final List<OcpMongoReplicaSetMember> members;
|
private final List<OcpMongoReplicaSetMember> members;
|
||||||
|
|
||||||
public OcpMongoReplicaSet(String name, boolean configServer, int memberCount, String rootUserName, String rootPassword, OpenShiftClient ocp, String project,
|
public OcpMongoReplicaSet(String name, boolean configServer, int memberCount, String rootUserName, String rootPassword, OpenShiftClient ocp, String project,
|
||||||
boolean useKeyfile, boolean useTsl, int shardNum) {
|
boolean useKeyfile, boolean useTls, int shardNum) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.configServer = configServer;
|
this.configServer = configServer;
|
||||||
this.memberCount = memberCount;
|
this.memberCount = memberCount;
|
||||||
this.useTsl = useTsl;
|
this.useTls = useTls;
|
||||||
this.authRequired = false;
|
this.authRequired = false;
|
||||||
this.rootUserName = rootUserName;
|
this.rootUserName = rootUserName;
|
||||||
this.rootPassword = rootPassword;
|
this.rootPassword = rootPassword;
|
||||||
@ -111,7 +111,7 @@ public void start() {
|
|||||||
if (useKeyfile) {
|
if (useKeyfile) {
|
||||||
members.forEach(m -> MongoShardedUtil.addKeyFileToDeployment(m.getDeployment()));
|
members.forEach(m -> MongoShardedUtil.addKeyFileToDeployment(m.getDeployment()));
|
||||||
}
|
}
|
||||||
if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
|
if (useTls) {
|
||||||
members.forEach(m -> MongoShardedUtil.addCertificatesToDeployment(m.getDeployment()));
|
members.forEach(m -> MongoShardedUtil.addCertificatesToDeployment(m.getDeployment()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +229,7 @@ public static final class OcpMongoReplicaSetBuilder {
|
|||||||
private String project;
|
private String project;
|
||||||
private boolean useKeyfile;
|
private boolean useKeyfile;
|
||||||
private int shardNum;
|
private int shardNum;
|
||||||
private boolean useTsl;
|
private boolean useTls;
|
||||||
|
|
||||||
private OcpMongoReplicaSetBuilder() {
|
private OcpMongoReplicaSetBuilder() {
|
||||||
}
|
}
|
||||||
@ -274,8 +274,8 @@ public OcpMongoReplicaSetBuilder withUseKeyfile(boolean useKeyfile) {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public OcpMongoReplicaSetBuilder withUseTsl(boolean useTsl) {
|
public OcpMongoReplicaSetBuilder withUseTls(boolean useTls) {
|
||||||
this.useTsl = useTsl;
|
this.useTls = useTls;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -285,7 +285,7 @@ public OcpMongoReplicaSetBuilder withShardNum(int shardNum) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public OcpMongoReplicaSet build() {
|
public OcpMongoReplicaSet build() {
|
||||||
return new OcpMongoReplicaSet(name, configServer, memberCount, rootUserName, rootPassword, ocp, project, useKeyfile, useTsl , shardNum);
|
return new OcpMongoReplicaSet(name, configServer, memberCount, rootUserName, rootPassword, ocp, project, useKeyfile, useTls, shardNum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ public class OcpMongoShardedCluster implements Startable {
|
|||||||
private final String rootUserName;
|
private final String rootUserName;
|
||||||
private final String rootPassword;
|
private final String rootPassword;
|
||||||
private final boolean useInternalAuth;
|
private final boolean useInternalAuth;
|
||||||
private final boolean useTsl;
|
private final boolean useTls;
|
||||||
private final OpenShiftClient ocp;
|
private final OpenShiftClient ocp;
|
||||||
private final OpenShiftUtils ocpUtils;
|
private final OpenShiftUtils ocpUtils;
|
||||||
private final int initialShardCount;
|
private final int initialShardCount;
|
||||||
@ -63,7 +63,7 @@ public void start() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useTsl && useInternalAuth) {
|
if (useTls && useInternalAuth) {
|
||||||
throw new IllegalStateException("Cannot deploy mongo with both tls and keyfile internal auth");
|
throw new IllegalStateException("Cannot deploy mongo with both tls and keyfile internal auth");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,10 +130,11 @@ public void removeShard() {
|
|||||||
/**
|
/**
|
||||||
* deploy new shard and initialize it. Requires running initialized sharded mongo cluster
|
* deploy new shard and initialize it. Requires running initialized sharded mongo cluster
|
||||||
*/
|
*/
|
||||||
public void addShard(@Nullable Map<MongoShardKey, ShardKeyRange> rangeMap) {
|
public OcpMongoReplicaSet addShard(@Nullable Map<MongoShardKey, ShardKeyRange> rangeMap) {
|
||||||
int shardNum = shardReplicaSets.size();
|
int shardNum = shardReplicaSets.size();
|
||||||
var rs = deployNewShard(shardNum);
|
var rs = deployNewShard(shardNum);
|
||||||
registerShardInMongos(rangeMap, rs);
|
registerShardInMongos(rangeMap, rs);
|
||||||
|
return rs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -193,7 +194,7 @@ private OcpMongoReplicaSet deployNewShard(int shardNum) {
|
|||||||
.withRootPassword(rootPassword)
|
.withRootPassword(rootPassword)
|
||||||
.withMemberCount(replicaCount)
|
.withMemberCount(replicaCount)
|
||||||
.withUseKeyfile(useInternalAuth)
|
.withUseKeyfile(useInternalAuth)
|
||||||
.withUseTsl(useTsl)
|
.withUseTls(useTls)
|
||||||
.withOcp(ocp)
|
.withOcp(ocp)
|
||||||
.withProject(project)
|
.withProject(project)
|
||||||
.build();
|
.build();
|
||||||
@ -222,7 +223,7 @@ private void deployConfigServers() {
|
|||||||
.withRootPassword(rootPassword)
|
.withRootPassword(rootPassword)
|
||||||
.withMemberCount(configServerCount)
|
.withMemberCount(configServerCount)
|
||||||
.withUseKeyfile(useInternalAuth)
|
.withUseKeyfile(useInternalAuth)
|
||||||
.withUseTsl(useTsl)
|
.withUseTls(useTls)
|
||||||
.withOcp(ocp)
|
.withOcp(ocp)
|
||||||
.withProject(project)
|
.withProject(project)
|
||||||
.build();
|
.build();
|
||||||
@ -237,7 +238,7 @@ private void deployMongos() {
|
|||||||
MongoShardedUtil.addKeyFileToDeployment(mongosRouter.getDeployment());
|
MongoShardedUtil.addKeyFileToDeployment(mongosRouter.getDeployment());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (useTsl) {
|
if (useTls) {
|
||||||
MongoShardedUtil.addCertificatesToDeployment(mongosRouter.getDeployment());
|
MongoShardedUtil.addCertificatesToDeployment(mongosRouter.getDeployment());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,14 +287,14 @@ private String createKeyRangeCommand(ShardKeyRange range, MongoShardKey key) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public OcpMongoShardedCluster(int initialShardCount, int replicaCount, int configServerCount, @Nullable String rootUserName, @Nullable String rootPassword,
|
public OcpMongoShardedCluster(int initialShardCount, int replicaCount, int configServerCount, @Nullable String rootUserName, @Nullable String rootPassword,
|
||||||
boolean useInternalAuth, boolean useTsl, OpenShiftClient ocp, String project, List<MongoShardKey> shardKeys) {
|
boolean useInternalAuth, boolean useTls, OpenShiftClient ocp, String project, List<MongoShardKey> shardKeys) {
|
||||||
this.initialShardCount = initialShardCount;
|
this.initialShardCount = initialShardCount;
|
||||||
this.replicaCount = replicaCount;
|
this.replicaCount = replicaCount;
|
||||||
this.configServerCount = configServerCount;
|
this.configServerCount = configServerCount;
|
||||||
this.rootUserName = StringUtils.isNotEmpty(rootUserName) ? rootUserName : ConfigProperties.DATABASE_MONGO_USERNAME;
|
this.rootUserName = StringUtils.isNotEmpty(rootUserName) ? rootUserName : ConfigProperties.DATABASE_MONGO_USERNAME;
|
||||||
this.rootPassword = StringUtils.isNotEmpty(rootPassword) ? rootPassword : ConfigProperties.DATABASE_MONGO_SA_PASSWORD;
|
this.rootPassword = StringUtils.isNotEmpty(rootPassword) ? rootPassword : ConfigProperties.DATABASE_MONGO_SA_PASSWORD;
|
||||||
this.useInternalAuth = useInternalAuth;
|
this.useInternalAuth = useInternalAuth;
|
||||||
this.useTsl = useTsl;
|
this.useTls = useTls;
|
||||||
this.ocp = ocp;
|
this.ocp = ocp;
|
||||||
this.project = project;
|
this.project = project;
|
||||||
this.ocpUtils = new OpenShiftUtils(ocp);
|
this.ocpUtils = new OpenShiftUtils(ocp);
|
||||||
@ -305,7 +306,7 @@ public static OcpMongoShardedClusterBuilder builder() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean getUseTls() {
|
public boolean getUseTls() {
|
||||||
return useTsl;
|
return useTls;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class OcpMongoShardedClusterBuilder {
|
public static final class OcpMongoShardedClusterBuilder {
|
||||||
@ -318,7 +319,7 @@ public static final class OcpMongoShardedClusterBuilder {
|
|||||||
private int initialShardCount;
|
private int initialShardCount;
|
||||||
private String project;
|
private String project;
|
||||||
private List<MongoShardKey> shardKeys;
|
private List<MongoShardKey> shardKeys;
|
||||||
private boolean useTsl;
|
private boolean useTls;
|
||||||
|
|
||||||
private OcpMongoShardedClusterBuilder() {
|
private OcpMongoShardedClusterBuilder() {
|
||||||
}
|
}
|
||||||
@ -344,8 +345,8 @@ public OcpMongoShardedClusterBuilder withUseInternalAuth(boolean useInternalAuth
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public OcpMongoShardedClusterBuilder withUseTsl(boolean useTsl) {
|
public OcpMongoShardedClusterBuilder withUseTls(boolean useTls) {
|
||||||
this.useTsl = useTsl;
|
this.useTls = useTls;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -370,7 +371,7 @@ public OcpMongoShardedClusterBuilder withShardKeys(List<MongoShardKey> shardKeys
|
|||||||
}
|
}
|
||||||
|
|
||||||
public OcpMongoShardedCluster build() {
|
public OcpMongoShardedCluster build() {
|
||||||
return new OcpMongoShardedCluster(initialShardCount, replicaCount, configServerCount, rootUserName, rootPassword, useInternalAuth, useTsl, ocp, project,
|
return new OcpMongoShardedCluster(initialShardCount, replicaCount, configServerCount, rootUserName, rootPassword, useInternalAuth, useTls, ocp, project,
|
||||||
shardKeys);
|
shardKeys);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ public class OcpMongoShardedConstants {
|
|||||||
|
|
||||||
public final static String INIT_RS_TEMPLATE = "init-rs.js";
|
public final static String INIT_RS_TEMPLATE = "init-rs.js";
|
||||||
public final static String CREATE_CERT_USER_TEMPLATE = "create-dbz-user-x509.js";
|
public final static String CREATE_CERT_USER_TEMPLATE = "create-dbz-user-x509.js";
|
||||||
public final static String CREATE_DBZ_USER_TEMPLATE = "create-dbz-user-x509.js";
|
public final static String CREATE_DBZ_USER_TEMPLATE = "create-dbz-user.js";
|
||||||
public final static String INSERT_MONGOS_DATA_SCRIPT_LOC = "/database-resources/mongodb/sharded/insert-mongos-data.js";
|
public final static String INSERT_MONGOS_DATA_SCRIPT_LOC = "/database-resources/mongodb/sharded/insert-mongos-data.js";
|
||||||
public final static String KEYFILE_PATH_IN_CONTAINER = "/etc/mongodb.keyfile";
|
public final static String KEYFILE_PATH_IN_CONTAINER = "/etc/mongodb.keyfile";
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ public OcpMongoShardedController deploy() throws Exception {
|
|||||||
.withReplicaCount(replicaCount)
|
.withReplicaCount(replicaCount)
|
||||||
.withShardKeys(shardKeys)
|
.withShardKeys(shardKeys)
|
||||||
.withUseInternalAuth(useKeyfile)
|
.withUseInternalAuth(useKeyfile)
|
||||||
.withUseTsl(useTls)
|
.withUseTls(useTls)
|
||||||
.withRootUser(rootUserName, rootPassword)
|
.withRootUser(rootUserName, rootPassword)
|
||||||
.withShardKeys(shardKeys)
|
.withShardKeys(shardKeys)
|
||||||
.build();
|
.build();
|
||||||
|
@ -11,6 +11,9 @@
|
|||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import io.debezium.testing.system.tools.ConfigProperties;
|
||||||
|
import io.debezium.testing.system.tools.certificateutil.CertUtil;
|
||||||
|
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoCertGenerator;
|
||||||
import io.strimzi.api.kafka.model.connector.KafkaConnector;
|
import io.strimzi.api.kafka.model.connector.KafkaConnector;
|
||||||
import io.strimzi.api.kafka.model.connector.KafkaConnectorBuilder;
|
import io.strimzi.api.kafka.model.connector.KafkaConnectorBuilder;
|
||||||
|
|
||||||
@ -119,6 +122,25 @@ public ConnectorConfigBuilder addOperationRouterForTable(String op, String table
|
|||||||
return addOperationRouter(op, targetTopicName, serverName + ".*\\." + tableName);
|
return addOperationRouter(op, targetTopicName, serverName + ".*\\." + tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ConnectorConfigBuilder addMongoTlsParams() {
|
||||||
|
this
|
||||||
|
.put("mongodb.ssl.enabled", true)
|
||||||
|
.put("mongodb.ssl.keystore",
|
||||||
|
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.KEYSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.KEYSTORE_SUBPATH)
|
||||||
|
.put("mongodb.ssl.keystore.password", CertUtil.KEYSTORE_PASSWORD)
|
||||||
|
.put("mongodb.ssl.truststore",
|
||||||
|
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.TRUSTSTORE_SUBPATH)
|
||||||
|
.put("mongodb.ssl.truststore.password", CertUtil.KEYSTORE_PASSWORD);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConnectorConfigBuilder addMongoPasswordAuthParams() {
|
||||||
|
this
|
||||||
|
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
|
||||||
|
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get configuration as JSON string
|
* Get configuration as JSON string
|
||||||
* @return JSON string of connector config
|
* @return JSON string of connector config
|
||||||
|
@ -31,7 +31,6 @@ public ConnectorConfigBuilder connectorConfig(String connectorName) {
|
|||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return new ConnectorFactories(kafkaController).shardedMongo(dbController, connectorName);
|
return new ConnectorFactories(kafkaController).shardedMongo(dbController, connectorName);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||||
|
|
||||||
import io.debezium.testing.system.resources.ConnectorFactories;
|
import io.debezium.testing.system.resources.ConnectorFactories;
|
||||||
|
import io.debezium.testing.system.tools.ConfigProperties;
|
||||||
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController;
|
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController;
|
||||||
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
|
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
|
||||||
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
|
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
|
||||||
@ -25,6 +26,11 @@ public ShardedReplicaMongoConnector(ExtensionContext.Store store) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ConnectorConfigBuilder connectorConfig(String connectorName) {
|
public ConnectorConfigBuilder connectorConfig(String connectorName) {
|
||||||
return new ConnectorFactories(kafkaController).shardedReplicaMongo(dbController, connectorName);
|
if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
|
||||||
|
return new ConnectorFactories(kafkaController).shardedReplicaMongoWithTls(dbController, connectorName);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return new ConnectorFactories(kafkaController).shardedReplicaMongo(dbController, connectorName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,10 +8,8 @@
|
|||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import io.debezium.testing.system.tools.ConfigProperties;
|
import io.debezium.testing.system.tools.ConfigProperties;
|
||||||
import io.debezium.testing.system.tools.certificateutil.CertUtil;
|
|
||||||
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
|
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
|
||||||
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
|
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
|
||||||
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoCertGenerator;
|
|
||||||
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
|
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
|
||||||
import io.debezium.testing.system.tools.kafka.KafkaController;
|
import io.debezium.testing.system.tools.kafka.KafkaController;
|
||||||
|
|
||||||
@ -108,8 +106,7 @@ public ConnectorConfigBuilder shardedMongo(MongoDatabaseController controller, S
|
|||||||
.put("task.max", 1)
|
.put("task.max", 1)
|
||||||
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
|
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
|
||||||
.put("mongodb.connection.mode", "sharded")
|
.put("mongodb.connection.mode", "sharded")
|
||||||
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
|
.addMongoPasswordAuthParams()
|
||||||
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD)
|
|
||||||
.addOperationRouterForTable("u", "customers");
|
.addOperationRouterForTable("u", "customers");
|
||||||
return cb;
|
return cb;
|
||||||
}
|
}
|
||||||
@ -122,13 +119,7 @@ public ConnectorConfigBuilder shardedMongoWithTls(MongoDatabaseController contro
|
|||||||
.put("task.max", 1)
|
.put("task.max", 1)
|
||||||
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
|
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
|
||||||
.put("mongodb.connection.mode", "sharded")
|
.put("mongodb.connection.mode", "sharded")
|
||||||
.put("mongodb.ssl.enabled", true)
|
.addMongoTlsParams()
|
||||||
.put("mongodb.ssl.keystore",
|
|
||||||
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.KEYSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.KEYSTORE_SUBPATH)
|
|
||||||
.put("mongodb.ssl.keystore.password", CertUtil.KEYSTORE_PASSWORD)
|
|
||||||
.put("mongodb.ssl.truststore",
|
|
||||||
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.TRUSTSTORE_SUBPATH)
|
|
||||||
.put("mongodb.ssl.truststore.password", CertUtil.KEYSTORE_PASSWORD)
|
|
||||||
.addOperationRouterForTable("u", "customers");
|
.addOperationRouterForTable("u", "customers");
|
||||||
return cb;
|
return cb;
|
||||||
}
|
}
|
||||||
@ -141,10 +132,24 @@ public ConnectorConfigBuilder shardedReplicaMongo(MongoDatabaseController contro
|
|||||||
.put("topic.prefix", connectorName)
|
.put("topic.prefix", connectorName)
|
||||||
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
|
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
|
||||||
.put("task.max", 4)
|
.put("task.max", 4)
|
||||||
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
|
|
||||||
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD)
|
|
||||||
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
|
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
|
||||||
.put("mongodb.connection.mode", "replica_set")
|
.put("mongodb.connection.mode", "replica_set")
|
||||||
|
.addMongoPasswordAuthParams()
|
||||||
|
.addOperationRouterForTable("u", "customers");
|
||||||
|
return cb;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConnectorConfigBuilder shardedReplicaMongoWithTls(MongoDatabaseController controller, String connectorName) {
|
||||||
|
|
||||||
|
// String connectionUrl =;
|
||||||
|
ConnectorConfigBuilder cb = new ConnectorConfigBuilder(connectorName);
|
||||||
|
cb
|
||||||
|
.put("topic.prefix", connectorName)
|
||||||
|
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
|
||||||
|
.put("task.max", 4)
|
||||||
|
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
|
||||||
|
.put("mongodb.connection.mode", "replica_set")
|
||||||
|
.addMongoTlsParams()
|
||||||
.addOperationRouterForTable("u", "customers");
|
.addOperationRouterForTable("u", "customers");
|
||||||
return cb;
|
return cb;
|
||||||
}
|
}
|
||||||
|
@ -92,9 +92,8 @@ protected void addAndRemoveShardTest(OcpMongoShardedController dbController, Str
|
|||||||
// add shard, restart connector, insert to that shard and verify that insert was captured by debezium
|
// add shard, restart connector, insert to that shard and verify that insert was captured by debezium
|
||||||
var key = dbController.getMongo().getShardKey("inventory.customers");
|
var key = dbController.getMongo().getShardKey("inventory.customers");
|
||||||
var keyRange = new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(3), "1100", "1105");
|
var keyRange = new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(3), "1100", "1105");
|
||||||
dbController.getMongo().addShard(Map.of(key, keyRange));
|
var newShard = dbController.getMongo().addShard(Map.of(key, keyRange));
|
||||||
var sets = dbController.getMongo().getShardReplicaSets();
|
newShard.executeMongosh(dbController.createDbzUserCommand(), false);
|
||||||
sets.get(sets.size() - 1).executeMongosh(dbController.createDbzUserCommand(), true);
|
|
||||||
|
|
||||||
connectController.undeployConnector(connectorName);
|
connectController.undeployConnector(connectorName);
|
||||||
connectController.deployConnector(connectorConfig);
|
connectController.deployConnector(connectorConfig);
|
||||||
|
Loading…
Reference in New Issue
Block a user