DBZ-7605 added ssl option for mongo tests

This commit is contained in:
Martin Medek 2024-02-14 11:00:39 +01:00 committed by Ondrej Babec
parent 9021004526
commit 56eef4eef9
26 changed files with 861 additions and 58 deletions

View File

@ -110,8 +110,8 @@
<database.mongo.docker.replica.size>1</database.mongo.docker.replica.size> <database.mongo.docker.replica.size>1</database.mongo.docker.replica.size>
<!--Mongodb sharded configuration--> <!--Mongodb sharded configuration-->
<database.mongo.use.keyfile>true</database.mongo.use.keyfile> <database.mongo.use.keyfile>false</database.mongo.use.keyfile>
<database.mongo.use.tls>false</database.mongo.use.tls>
<!--DB2 configuration --> <!--DB2 configuration -->
<database.db2.port>50000</database.db2.port> <database.db2.port>50000</database.db2.port>
@ -408,6 +408,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
</dependency> </dependency>
@ -476,26 +477,30 @@
<artifactId>freemarker</artifactId> <artifactId>freemarker</artifactId>
<version>2.3.32</version> <version>2.3.32</version>
</dependency> </dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.70</version>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<profile> <profile>
<!-- Required to run tests against downstream --> <!-- Required to run tests against downstream -->
<id>product</id> <id>product</id>
<properties> <properties>
<as.apicurio.url>${as.url}/service-registry-${as.apicurio.version}-distro-connect-converter.zip</as.apicurio.url> <as.apicurio.url>${as.url}/service-registry-${as.apicurio.version}-distro-connect-converter.zip</as.apicurio.url>
<product.build>true</product.build> <product.build>true</product.build>
</properties> </properties>
</profile> </profile>
<profile> <profile>
<!-- Required to run Oracle tests --> <!-- Required to run Oracle tests -->
<id>oracleITs</id> <id>oracleITs</id>
<properties> <properties>
<database.oracle>true</database.oracle> <database.oracle>true</database.oracle>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.oracle.database.jdbc</groupId> <groupId>com.oracle.database.jdbc</groupId>
@ -507,7 +512,6 @@
<profile> <profile>
<!-- Required to run build images with Oracle connector --> <!-- Required to run build images with Oracle connector -->
<id>oracle-image</id> <id>oracle-image</id>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
@ -618,6 +622,7 @@
<!--Mongo sharded configuration--> <!--Mongo sharded configuration-->
<test.database.mongo.use.keyfile>${database.mongo.use.keyfile}</test.database.mongo.use.keyfile> <test.database.mongo.use.keyfile>${database.mongo.use.keyfile}</test.database.mongo.use.keyfile>
<test.database.mongo.use.tls>${database.mongo.use.tls}</test.database.mongo.use.tls>
<!--DB2 configuration--> <!--DB2 configuration-->
<test.database.db2.host>${database.db2.host}</test.database.db2.host> <test.database.db2.host>${database.db2.host}</test.database.db2.host>

View File

@ -102,7 +102,9 @@ private ConfigProperties() {
public static final Optional<String> DATABASE_MONGO_HOST = stringOptionalProperty("test.database.mongo.host"); public static final Optional<String> DATABASE_MONGO_HOST = stringOptionalProperty("test.database.mongo.host");
public static final int DATABASE_MONGO_PORT = Integer.parseInt(System.getProperty("test.database.mongo.port", "27017")); public static final int DATABASE_MONGO_PORT = Integer.parseInt(System.getProperty("test.database.mongo.port", "27017"));
public static final boolean DATABASE_MONGO_USE_KEYFILE = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.keyfile")); public static final boolean DATABASE_MONGO_USE_KEYFILE = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.keyfile"));
public static final boolean DATABASE_MONGO_USE_TLS = Boolean.parseBoolean(System.getProperty("test.database.mongo.use.tls"));
public static final String DATABASE_MONGO_DOCKER_DESKTOP_PORTS = System.getProperty("database.mongo.docker.desktop.ports", "27017:27117"); public static final String DATABASE_MONGO_DOCKER_DESKTOP_PORTS = System.getProperty("database.mongo.docker.desktop.ports", "27017:27117");
public static final int DATABASE_MONGO_DOCKER_REPLICA_SIZE = Integer.parseInt(System.getProperty("database.mongo.docker.replica.size", "1")); public static final int DATABASE_MONGO_DOCKER_REPLICA_SIZE = Integer.parseInt(System.getProperty("database.mongo.docker.replica.size", "1"));
// DB2 Configuration // DB2 Configuration

View File

@ -0,0 +1,119 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.certificateutil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Base64;
import java.util.Map;
import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.jcajce.JcaPEMWriter;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.openshift.client.OpenShiftClient;
public class CertUtil {
public static final String KEYSTORE_PASSWORD = "password";
/**
* Uploads a string data as configMap to ocp cluster
* @param project namespace, where to create the configmap
* @param data content of a file in configMap
* @param configMapName config map name
* @param fileNameInConfigMap filename in configmap
* @param ocp ocp client
*/
public static void stringToConfigMap(String project, String data, String configMapName, String fileNameInConfigMap, OpenShiftClient ocp) {
var configMap = new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withName(configMapName)
.build())
.withData(Map.of(fileNameInConfigMap, data))
.build();
ocp.configMaps().inNamespace(project).createOrReplace(configMap);
}
/**
* Converts keystore to base64 string and uploads as configMap to ocp cluster
* @param project namespace, where to create the configmap
* @param keyStore keystore object to be saved in configmap
* @param configMapName config map name
* @param fileNameInConfigMap filename in configmap
* @param ocp ocp client
*/
public static void keystoreToConfigMap(String project, KeyStore keyStore, String configMapName, String fileNameInConfigMap, OpenShiftClient ocp)
throws IOException, CertificateException, KeyStoreException, NoSuchAlgorithmException {
char[] pwdArray = KEYSTORE_PASSWORD.toCharArray();
try (ByteArrayOutputStream fos = new ByteArrayOutputStream()) {
keyStore.store(fos, pwdArray);
var configMap = new ConfigMapBuilder()
.withMetadata(new ObjectMetaBuilder()
.withName(configMapName)
.build())
.withBinaryData(Map.of(fileNameInConfigMap, Base64.getEncoder().encodeToString(fos.toByteArray())))
.build();
ocp.configMaps().inNamespace(project).createOrReplace(configMap);
}
}
/**
* exports PK to base64 string
* @param privateKey
* @return
* @throws IOException
*/
public static String exportToBase64PEMString(PrivateKey privateKey) throws IOException {
StringWriter sw = new StringWriter();
try (JcaPEMWriter pw = new JcaPEMWriter(sw)) {
pw.writeObject(privateKey);
}
return sw.toString();
}
public static String exportToBase64PEMString(X509CertificateHolder holder) throws CertificateException, IOException {
return exportToBase64PEMString(convertHolderToCert(holder));
}
private static X509Certificate convertHolderToCert(X509CertificateHolder holder) throws CertificateException {
JcaX509CertificateConverter converter = new JcaX509CertificateConverter();
converter.setProvider(new BouncyCastleProvider());
return converter.getCertificate(holder);
}
private static String exportToBase64PEMString(X509Certificate x509Cert) throws IOException {
StringWriter sw = new StringWriter();
try (JcaPEMWriter pw = new JcaPEMWriter(sw)) {
pw.writeObject(x509Cert);
}
return sw.toString();
}
/**
* Creates a String to be passed to mongodb as server certificateKeyFile
* @param cert
* @param ca
* @return
* @throws IOException
* @throws CertificateException
*/
public static String exportCertificateToMongoCompatiblePem(CertificateWrapper cert, CertificateWrapper ca) throws IOException, CertificateException {
return exportToBase64PEMString(cert.getKeyPair().getPrivate()) +
exportToBase64PEMString(cert.getHolder()) +
exportToBase64PEMString(ca.getHolder());
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.certificateutil;
import org.bouncycastle.asn1.ASN1Encodable;
import org.bouncycastle.asn1.ASN1ObjectIdentifier;
/**
* Simple abstraction for certificate extension for use in certificate builders
*/
public class CertificateExtensionWrapper {
private final ASN1ObjectIdentifier identifier;
private final boolean isCritical;
private final ASN1Encodable value;
public CertificateExtensionWrapper(ASN1ObjectIdentifier identifier, boolean isCritical, ASN1Encodable value) {
this.identifier = identifier;
this.isCritical = isCritical;
this.value = value;
}
public ASN1ObjectIdentifier getIdentifier() {
return identifier;
}
public ASN1Encodable getValue() {
return value;
}
public boolean isCritical() {
return isCritical;
}
}

View File

@ -0,0 +1,200 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.certificateutil;
import java.io.IOException;
import java.math.BigInteger;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.Security;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints;
import org.bouncycastle.asn1.x509.Extension;
import org.bouncycastle.asn1.x509.KeyUsage;
import org.bouncycastle.cert.CertIOException;
import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.cert.X509v3CertificateBuilder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class for generating certs. Can generate a CA cert and list of leaf certificates signed by this CA (no intermediates)
*/
public class CertificateGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(CertificateGenerator.class);
private static final String SIGNATURE_ALGORITHM = "SHA384WITHRSA";
private final X500Name caSubject = new X500Name("cn=RootCA");
private final List<CertificateWrapperBuilder> certSpecs;
private CertificateWrapper ca;
private final List<CertificateWrapper> generatedCerts = new LinkedList<>();
private final int PRIVATE_KEY_SIZE = 3072;
private final String PRIVATE_KEY_ALGORITHM = "RSA";
public CertificateGenerator(List<CertificateWrapperBuilder> leafSpec) {
this.certSpecs = leafSpec;
}
/**
* generate CA certificate and all leaf certificates specified in certSpecs attribute
* @throws Exception
*/
public void generate() throws Exception {
// generate keys and certificates
ca = generateCa();
certSpecs.forEach(l -> {
try {
var cert = genLeafCert(l);
generatedCerts.add(cert);
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* Generates keystore containing a leaf private key and its certificate chain. Usable for keystore/truststore generation
* keystore password set by constant in CertUtil class
* @param leafName
* @return
* @throws Exception
*/
public KeyStore generateKeyStore(String leafName) throws Exception {
var certWrapper = getLeafCertificateWrapper(leafName);
var keystore = KeyStore.getInstance("JKS");
keystore.load(null);
// Import private key
keystore.setKeyEntry(leafName,
certWrapper.getKeyPair().getPrivate(),
CertUtil.KEYSTORE_PASSWORD.toCharArray(),
new X509Certificate[]{ holderToCert(certWrapper.getHolder()), holderToCert(ca.getHolder()) });
return keystore;
}
public CertificateWrapper getLeafCertificateWrapper(String name) {
var spec = generatedCerts.stream().filter(l -> l.getName().equals(name)).collect(Collectors.toList());
if (spec.size() != 1) {
throw new IllegalArgumentException("Certificate not found in generated certs list");
}
return spec.get(0);
}
public CertificateWrapper getCa() {
return ca;
}
private CertificateWrapper generateCa() throws IOException {
Security.addProvider(new BouncyCastleProvider());
KeyPair keyPair = generateKeyPair();
long notBefore = System.currentTimeMillis();
long notAfter = notBefore + (1000L * 3600L * 24 * 365); // one year from now
X509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(
caSubject,
BigInteger.ONE,
new Date(notBefore),
new Date(notAfter),
caSubject,
keyPair.getPublic());
X509CertificateHolder certHolder;
List<CertificateExtensionWrapper> extensions = List.of(
new CertificateExtensionWrapper(Extension.basicConstraints, true, new BasicConstraints(true)),
new CertificateExtensionWrapper(Extension.keyUsage, true, new KeyUsage(KeyUsage.keyCertSign))// ,
);
try {
extensions.forEach(e -> {
try {
certBuilder.addExtension(e.getIdentifier(), e.isCritical(), e.getValue());
}
catch (CertIOException ex) {
throw new RuntimeException(ex);
}
});
final ContentSigner signer = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).build(keyPair.getPrivate());
certHolder = certBuilder.build(signer);
}
catch (Exception e) {
throw new RuntimeException(e);
}
return CertificateWrapper.builder()
.withKeyPair(keyPair)
.withExtensions(extensions)
.withSubject(new String(caSubject.getEncoded()))
.withHolder(certHolder)
.build();
}
private CertificateWrapper genLeafCert(CertificateWrapperBuilder builder) throws OperatorCreationException {
KeyPair keyPair = generateKeyPair();
long notBefore = System.currentTimeMillis();
long notAfter = notBefore + (1000L * 3600L * 24 * 365);
X509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(
caSubject,
new BigInteger(String.valueOf(System.currentTimeMillis())),
new Date(notBefore),
new Date(notAfter),
new X500Name(builder.getSubject()),
keyPair.getPublic());
builder.getExtensions().forEach(e -> {
try {
certBuilder.addExtension(e.getIdentifier(), e.isCritical(), e.getValue());
}
catch (CertIOException ex) {
throw new RuntimeException(ex);
}
});
ContentSigner signer = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).build(ca.getKeyPair().getPrivate());
var holder = certBuilder.build(signer);
return builder
.withKeyPair(keyPair)
.withHolder(holder)
.build();
}
private X509Certificate holderToCert(X509CertificateHolder holder) throws CertificateException {
JcaX509CertificateConverter converter = new JcaX509CertificateConverter();
converter.setProvider(new BouncyCastleProvider());
return converter.getCertificate(holder);
}
private KeyPair generateKeyPair() {
try {
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance(PRIVATE_KEY_ALGORITHM);
keyPairGenerator.initialize(PRIVATE_KEY_SIZE, new SecureRandom());
return keyPairGenerator.generateKeyPair();
}
catch (GeneralSecurityException var2) {
throw new AssertionError(var2);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.certificateutil;
import java.security.KeyPair;
import java.util.List;
import org.bouncycastle.cert.X509CertificateHolder;
public class CertificateWrapper {
private final String name;
private final KeyPair keyPair;
private final String subject;
private final List<CertificateExtensionWrapper> extensions;
private final X509CertificateHolder holder;
public static CertificateWrapperBuilder builder() {
return new CertificateWrapperBuilder();
}
public CertificateWrapper(String name, KeyPair keyPair, String subject, List<CertificateExtensionWrapper> extensions, X509CertificateHolder holder) {
this.name = name;
this.keyPair = keyPair;
this.subject = subject;
this.extensions = extensions;
this.holder = holder;
}
public String getName() {
return name;
}
public KeyPair getKeyPair() {
return keyPair;
}
public String getSubject() {
return subject;
}
public List<CertificateExtensionWrapper> getExtensions() {
return extensions;
}
public X509CertificateHolder getHolder() {
return holder;
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.certificateutil;
import java.security.KeyPair;
import java.util.List;
import org.bouncycastle.cert.X509CertificateHolder;
public final class CertificateWrapperBuilder {
private KeyPair keyPair;
private String subject;
private List<CertificateExtensionWrapper> extensions;
private X509CertificateHolder holder;
private String name;
CertificateWrapperBuilder() {
}
public CertificateWrapperBuilder withKeyPair(KeyPair keyPair) {
this.keyPair = keyPair;
return this;
}
public CertificateWrapperBuilder withSubject(String subject) {
this.subject = subject;
return this;
}
public CertificateWrapperBuilder withName(String name) {
this.name = name;
return this;
}
public CertificateWrapperBuilder withExtensions(List<CertificateExtensionWrapper> extensions) {
this.extensions = extensions;
return this;
}
public CertificateWrapperBuilder withHolder(X509CertificateHolder holder) {
this.holder = holder;
return this;
}
public String getSubject() {
return subject;
}
public List<CertificateExtensionWrapper> getExtensions() {
return extensions;
}
public CertificateWrapper build() {
return new CertificateWrapper(name, keyPair, subject, extensions, holder);
}
}

View File

@ -15,6 +15,9 @@
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentproviders.OcpShardModelProvider; import io.debezium.testing.system.tools.databases.mongodb.sharded.componentproviders.OcpShardModelProvider;
import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarker.CreateUserModel; import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarker.CreateUserModel;
import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarker.FreemarkerConfiguration; import io.debezium.testing.system.tools.databases.mongodb.sharded.freemarker.FreemarkerConfiguration;
import io.fabric8.kubernetes.api.model.ConfigMapVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.Deployment;
import freemarker.template.Template; import freemarker.template.Template;
@ -40,13 +43,27 @@ public static OpenShiftUtils.CommandOutputs executeMongoShOnPod(OpenShiftUtils o
} }
} }
public static String createDebeziumUserCommand(String userName, String password) throws IOException, TemplateException { public static String createPasswordUserCommand(String userName, String password) throws IOException, TemplateException {
var writer = new StringWriter(); var writer = new StringWriter();
Template template = new FreemarkerConfiguration().getFreemarkerConfiguration().getTemplate(OcpMongoShardedConstants.CREATE_DBZ_USER_TEMPLATE); Template template = new FreemarkerConfiguration().getFreemarkerConfiguration().getTemplate(OcpMongoShardedConstants.CREATE_DBZ_USER_TEMPLATE);
template.process(new CreateUserModel(userName, password), writer); template.process(new CreateUserModel(userName, password), writer);
return writer.toString(); return writer.toString();
} }
/**
* Get command that creates a user (with correct permissions for debezium usage) in mongodb for x509 client authentication.
* @param subjectName must match the subject of certificate used to authenticate user
* @return mongo command string
* @throws IOException
* @throws TemplateException
*/
public static String createCertUserCommand(String subjectName) throws IOException, TemplateException {
var writer = new StringWriter();
Template template = new FreemarkerConfiguration().getFreemarkerConfiguration().getTemplate(OcpMongoShardedConstants.CREATE_CERT_USER_TEMPLATE);
template.process(new CreateUserModel(subjectName, ""), writer);
return writer.toString();
}
public static List<MongoShardKey> getTestShardKeys() { public static List<MongoShardKey> getTestShardKeys() {
MongoShardKey customersKey = new MongoShardKey("inventory.customers", "_id", MongoShardKey.ShardingType.RANGED); MongoShardKey customersKey = new MongoShardKey("inventory.customers", "_id", MongoShardKey.ShardingType.RANGED);
customersKey.getKeyRanges().add(new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(1), "1000", "1003")); customersKey.getKeyRanges().add(new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(1), "1000", "1003"));
@ -60,6 +77,11 @@ public static String createRootUserCommand(String userName, String password) {
return "db.getSiblingDB('admin').createUser({user: '" + userName + "', pwd: '" + password + "', roles: [{role:\"root\",db:\"admin\"}] })"; return "db.getSiblingDB('admin').createUser({user: '" + userName + "', pwd: '" + password + "', roles: [{role:\"root\",db:\"admin\"}] })";
} }
/**
* Modify the mongodb deployment to use keyfile for internal authentication.
* Mutually exclusive with using x509 certificates for internal auth
* @param deployment
*/
public static void addKeyFileToDeployment(Deployment deployment) { public static void addKeyFileToDeployment(Deployment deployment) {
deployment deployment
.getSpec() .getSpec()
@ -68,6 +90,79 @@ public static void addKeyFileToDeployment(Deployment deployment) {
.getContainers() .getContainers()
.get(0) .get(0)
.getCommand() .getCommand()
.addAll(List.of("--keyFile", OcpMongoShardedConstants.KEYFILE_PATH_IN_CONTAINER)); .addAll(List.of("--clusterAuthMode", "keyFile",
"--keyFile", OcpMongoShardedConstants.KEYFILE_PATH_IN_CONTAINER));
}
/**
* Modify the mongodb deployment object to mount server and ca certificates (from configmap, name is set by constant) and use them
* in mongodb deployment for internal and client authentication.
* Mutually exclusive with using keyfile for internal auth
* @param deployment
*/
public static void addCertificatesToDeployment(Deployment deployment) {
String caCertVolume = "ca-cert-volume";
String serverCertVolume = "server-cert-volume";
String volumeMountRootPath = "/opt/";
// volumes
deployment.getSpec()
.getTemplate()
.getSpec()
.getVolumes()
.add(new VolumeBuilder()
.withName(serverCertVolume)
.withConfigMap(new ConfigMapVolumeSourceBuilder()
.withName(OcpMongoCertGenerator.SERVER_CERT_CONFIGMAP)
.build())
.build());
deployment.getSpec()
.getTemplate()
.getSpec()
.getVolumes()
.add(new VolumeBuilder()
.withName(caCertVolume)
.withConfigMap(new ConfigMapVolumeSourceBuilder()
.withName(OcpMongoCertGenerator.CA_CERT_CONFIGMAP)
.build())
.build());
// volume mounts
deployment
.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getVolumeMounts()
.add(new VolumeMountBuilder()
.withName(serverCertVolume)
.withMountPath(volumeMountRootPath + OcpMongoCertGenerator.SERVER_CERT_CONFIGMAP)
.build());
deployment
.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getVolumeMounts()
.add(new VolumeMountBuilder()
.withName(caCertVolume)
.withMountPath(volumeMountRootPath + OcpMongoCertGenerator.CA_CERT_CONFIGMAP)
.build());
// command
deployment
.getSpec()
.getTemplate()
.getSpec()
.getContainers()
.get(0)
.getCommand()
.addAll(List.of(
"--clusterAuthMode", "x509",
"--tlsMode", "preferTLS",
"--tlsCertificateKeyFile", volumeMountRootPath + OcpMongoCertGenerator.SERVER_CERT_CONFIGMAP + "/" + OcpMongoCertGenerator.SERVER_CERT_SUBPATH,
"--tlsCAFile", volumeMountRootPath + OcpMongoCertGenerator.CA_CERT_CONFIGMAP + "/" + OcpMongoCertGenerator.CA_CERT_SUBPATH));
} }
} }

View File

@ -0,0 +1,89 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.databases.mongodb.sharded;
import java.util.List;
import org.bouncycastle.asn1.ASN1Encodable;
import org.bouncycastle.asn1.DERSequence;
import org.bouncycastle.asn1.x509.ExtendedKeyUsage;
import org.bouncycastle.asn1.x509.Extension;
import org.bouncycastle.asn1.x509.GeneralName;
import org.bouncycastle.asn1.x509.KeyPurposeId;
import org.bouncycastle.asn1.x509.KeyUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.certificateutil.CertUtil;
import io.debezium.testing.system.tools.certificateutil.CertificateExtensionWrapper;
import io.debezium.testing.system.tools.certificateutil.CertificateGenerator;
import io.debezium.testing.system.tools.certificateutil.CertificateWrapper;
import io.debezium.testing.system.tools.certificateutil.CertificateWrapperBuilder;
import io.fabric8.openshift.client.OpenShiftClient;
public class OcpMongoCertGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(OcpMongoCertGenerator.class);
public static final String KEYSTORE_CONFIGMAP = "keystore";
public static final String KEYSTORE_SUBPATH = "keystore.jks";
public static final String TRUSTSTORE_CONFIGMAP = "truststore";
public static final String TRUSTSTORE_SUBPATH = "truststore.jks";
public static final String SERVER_CERT_CONFIGMAP = "server-cert";
public static final String SERVER_CERT_SUBPATH = "server-combined.pem";
public static final String CA_CERT_CONFIGMAP = "ca-cert";
public static final String CA_CERT_SUBPATH = "ca-cert.pem";
public static final String CLIENT_CERT_SUBJECT = "CN=client";
private static final String SERVER_CERT_SUBJECT = "O=Debezium, CN=mongo-server";
private static final String CLIENT_CERT_NAME = "client";
private static final String SERVER_CERT_NAME = "server";
public static void generateMongoTestCerts(OpenShiftClient ocp) throws Exception {
List<CertificateWrapperBuilder> specs = getLeafCertSpecs();
var certificateCreator = new CertificateGenerator(specs);
certificateCreator.generate();
LOGGER.info("Creating truststore/keystore configmaps for mongo connector");
CertUtil.keystoreToConfigMap(ConfigProperties.OCP_PROJECT_DBZ, certificateCreator.generateKeyStore(CLIENT_CERT_NAME), KEYSTORE_CONFIGMAP, KEYSTORE_SUBPATH, ocp);
CertUtil.keystoreToConfigMap(ConfigProperties.OCP_PROJECT_DBZ, certificateCreator.generateKeyStore(SERVER_CERT_NAME), TRUSTSTORE_CONFIGMAP, TRUSTSTORE_SUBPATH,
ocp);
LOGGER.info("Creating certificate configmaps for mongo database");
CertUtil.stringToConfigMap(ConfigProperties.OCP_PROJECT_MONGO,
CertUtil.exportCertificateToMongoCompatiblePem(certificateCreator.getLeafCertificateWrapper(SERVER_CERT_NAME), certificateCreator.getCa()),
SERVER_CERT_CONFIGMAP, SERVER_CERT_SUBPATH, ocp);
CertUtil.stringToConfigMap(ConfigProperties.OCP_PROJECT_MONGO, CertUtil.exportToBase64PEMString(certificateCreator.getCa().getHolder()), CA_CERT_CONFIGMAP,
CA_CERT_SUBPATH, ocp);
}
private static List<CertificateWrapperBuilder> getLeafCertSpecs() {
ASN1Encodable[] subjectAltNames = new ASN1Encodable[]{
new GeneralName(GeneralName.dNSName, "*." + ConfigProperties.OCP_PROJECT_MONGO + ".svc.cluster.local"),
new GeneralName(GeneralName.dNSName, "localhost"),
new GeneralName(GeneralName.iPAddress, "127.0.0.1")
};
return List.of(
CertificateWrapper.builder()
.withName(CLIENT_CERT_NAME)
.withSubject(CLIENT_CERT_SUBJECT)
.withExtensions(List.of(
new CertificateExtensionWrapper(Extension.keyUsage, true, new KeyUsage(KeyUsage.digitalSignature)),
new CertificateExtensionWrapper(Extension.extendedKeyUsage, true,
new ExtendedKeyUsage(new KeyPurposeId[]{ KeyPurposeId.id_kp_clientAuth })),
new CertificateExtensionWrapper(Extension.subjectAlternativeName, true, new DERSequence(subjectAltNames)))),
CertificateWrapper.builder()
.withName(SERVER_CERT_NAME)
.withSubject(SERVER_CERT_SUBJECT)
.withExtensions(List.of(
new CertificateExtensionWrapper(Extension.keyUsage, true, new KeyUsage(KeyUsage.digitalSignature)),
new CertificateExtensionWrapper(Extension.extendedKeyUsage, true,
new ExtendedKeyUsage(new KeyPurposeId[]{ KeyPurposeId.id_kp_clientAuth, KeyPurposeId.id_kp_serverAuth })),
new CertificateExtensionWrapper(Extension.subjectAlternativeName, true, new DERSequence(subjectAltNames)))));
}
}

View File

@ -48,21 +48,23 @@ public class OcpMongoReplicaSet implements Startable {
private final OpenShiftClient ocp; private final OpenShiftClient ocp;
private final OpenShiftUtils ocpUtil; private final OpenShiftUtils ocpUtil;
private final String project; private final String project;
private final boolean useInternalAuth; private final boolean useKeyfile;
private final boolean useTsl;
private final int shardNum; private final int shardNum;
private final List<OcpMongoReplicaSetMember> members; private final List<OcpMongoReplicaSetMember> members;
public OcpMongoReplicaSet(String name, boolean configServer, int memberCount, String rootUserName, String rootPassword, OpenShiftClient ocp, String project, public OcpMongoReplicaSet(String name, boolean configServer, int memberCount, String rootUserName, String rootPassword, OpenShiftClient ocp, String project,
boolean useInternalAuth, int shardNum) { boolean useKeyfile, boolean useTsl, int shardNum) {
this.name = name; this.name = name;
this.configServer = configServer; this.configServer = configServer;
this.memberCount = memberCount; this.memberCount = memberCount;
this.useTsl = useTsl;
this.authRequired = false; this.authRequired = false;
this.rootUserName = rootUserName; this.rootUserName = rootUserName;
this.rootPassword = rootPassword; this.rootPassword = rootPassword;
this.ocp = ocp; this.ocp = ocp;
this.project = project; this.project = project;
this.useInternalAuth = useInternalAuth; this.useKeyfile = useKeyfile;
this.shardNum = shardNum; this.shardNum = shardNum;
this.ocpUtil = new OpenShiftUtils(ocp); this.ocpUtil = new OpenShiftUtils(ocp);
@ -106,9 +108,12 @@ public void start() {
return; return;
} }
// Add keyfile to deployment // Add keyfile to deployment
if (useInternalAuth) { if (useKeyfile) {
members.forEach(m -> MongoShardedUtil.addKeyFileToDeployment(m.getDeployment())); members.forEach(m -> MongoShardedUtil.addKeyFileToDeployment(m.getDeployment()));
} }
if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
members.forEach(m -> MongoShardedUtil.addCertificatesToDeployment(m.getDeployment()));
}
// Deploy all members in parallel // Deploy all members in parallel
LOGGER.info("[{}] Starting {} node replica set...", name, memberCount); LOGGER.info("[{}] Starting {} node replica set...", name, memberCount);
@ -222,8 +227,9 @@ public static final class OcpMongoReplicaSetBuilder {
private String rootPassword; private String rootPassword;
private OpenShiftClient ocp; private OpenShiftClient ocp;
private String project; private String project;
private boolean useInternalAuth; private boolean useKeyfile;
private int shardNum; private int shardNum;
private boolean useTsl;
private OcpMongoReplicaSetBuilder() { private OcpMongoReplicaSetBuilder() {
} }
@ -263,8 +269,13 @@ public OcpMongoReplicaSetBuilder withProject(String project) {
return this; return this;
} }
public OcpMongoReplicaSetBuilder withUseInternalAuth(boolean useInternalAuth) { public OcpMongoReplicaSetBuilder withUseKeyfile(boolean useKeyfile) {
this.useInternalAuth = useInternalAuth; this.useKeyfile = useKeyfile;
return this;
}
public OcpMongoReplicaSetBuilder withUseTsl(boolean useTsl) {
this.useTsl = useTsl;
return this; return this;
} }
@ -274,7 +285,7 @@ public OcpMongoReplicaSetBuilder withShardNum(int shardNum) {
} }
public OcpMongoReplicaSet build() { public OcpMongoReplicaSet build() {
return new OcpMongoReplicaSet(name, configServer, memberCount, rootUserName, rootPassword, ocp, project, useInternalAuth, shardNum); return new OcpMongoReplicaSet(name, configServer, memberCount, rootUserName, rootPassword, ocp, project, useKeyfile, useTsl , shardNum);
} }
} }
} }

View File

@ -42,6 +42,7 @@ public class OcpMongoShardedCluster implements Startable {
private final String rootUserName; private final String rootUserName;
private final String rootPassword; private final String rootPassword;
private final boolean useInternalAuth; private final boolean useInternalAuth;
private final boolean useTsl;
private final OpenShiftClient ocp; private final OpenShiftClient ocp;
private final OpenShiftUtils ocpUtils; private final OpenShiftUtils ocpUtils;
private final int initialShardCount; private final int initialShardCount;
@ -62,6 +63,10 @@ public void start() {
return; return;
} }
if (useTsl && useInternalAuth) {
throw new IllegalStateException("Cannot deploy mongo with both tls and keyfile internal auth");
}
// deploy mongo components // deploy mongo components
deployConfigServers(); deployConfigServers();
deployShards(); deployShards();
@ -187,7 +192,8 @@ private OcpMongoReplicaSet deployNewShard(int shardNum) {
.withRootUserName(rootUserName) .withRootUserName(rootUserName)
.withRootPassword(rootPassword) .withRootPassword(rootPassword)
.withMemberCount(replicaCount) .withMemberCount(replicaCount)
.withUseInternalAuth(useInternalAuth) .withUseKeyfile(useInternalAuth)
.withUseTsl(useTsl)
.withOcp(ocp) .withOcp(ocp)
.withProject(project) .withProject(project)
.build(); .build();
@ -215,7 +221,8 @@ private void deployConfigServers() {
.withRootUserName(rootUserName) .withRootUserName(rootUserName)
.withRootPassword(rootPassword) .withRootPassword(rootPassword)
.withMemberCount(configServerCount) .withMemberCount(configServerCount)
.withUseInternalAuth(useInternalAuth) .withUseKeyfile(useInternalAuth)
.withUseTsl(useTsl)
.withOcp(ocp) .withOcp(ocp)
.withProject(project) .withProject(project)
.build(); .build();
@ -229,6 +236,11 @@ private void deployMongos() {
if (useInternalAuth) { if (useInternalAuth) {
MongoShardedUtil.addKeyFileToDeployment(mongosRouter.getDeployment()); MongoShardedUtil.addKeyFileToDeployment(mongosRouter.getDeployment());
} }
if (useTsl) {
MongoShardedUtil.addCertificatesToDeployment(mongosRouter.getDeployment());
}
LOGGER.info("Deploying mongos"); LOGGER.info("Deploying mongos");
mongosRouter.start(); mongosRouter.start();
} }
@ -274,13 +286,14 @@ private String createKeyRangeCommand(ShardKeyRange range, MongoShardKey key) {
} }
public OcpMongoShardedCluster(int initialShardCount, int replicaCount, int configServerCount, @Nullable String rootUserName, @Nullable String rootPassword, public OcpMongoShardedCluster(int initialShardCount, int replicaCount, int configServerCount, @Nullable String rootUserName, @Nullable String rootPassword,
boolean useInternalAuth, OpenShiftClient ocp, String project, List<MongoShardKey> shardKeys) { boolean useInternalAuth, boolean useTsl, OpenShiftClient ocp, String project, List<MongoShardKey> shardKeys) {
this.initialShardCount = initialShardCount; this.initialShardCount = initialShardCount;
this.replicaCount = replicaCount; this.replicaCount = replicaCount;
this.configServerCount = configServerCount; this.configServerCount = configServerCount;
this.rootUserName = StringUtils.isNotEmpty(rootUserName) ? rootUserName : ConfigProperties.DATABASE_MONGO_USERNAME; this.rootUserName = StringUtils.isNotEmpty(rootUserName) ? rootUserName : ConfigProperties.DATABASE_MONGO_USERNAME;
this.rootPassword = StringUtils.isNotEmpty(rootPassword) ? rootPassword : ConfigProperties.DATABASE_MONGO_SA_PASSWORD; this.rootPassword = StringUtils.isNotEmpty(rootPassword) ? rootPassword : ConfigProperties.DATABASE_MONGO_SA_PASSWORD;
this.useInternalAuth = useInternalAuth; this.useInternalAuth = useInternalAuth;
this.useTsl = useTsl;
this.ocp = ocp; this.ocp = ocp;
this.project = project; this.project = project;
this.ocpUtils = new OpenShiftUtils(ocp); this.ocpUtils = new OpenShiftUtils(ocp);
@ -291,6 +304,10 @@ public static OcpMongoShardedClusterBuilder builder() {
return new OcpMongoShardedClusterBuilder(); return new OcpMongoShardedClusterBuilder();
} }
public boolean getUseTls() {
return useTsl;
}
public static final class OcpMongoShardedClusterBuilder { public static final class OcpMongoShardedClusterBuilder {
private int replicaCount; private int replicaCount;
private int configServerCount; private int configServerCount;
@ -301,6 +318,7 @@ public static final class OcpMongoShardedClusterBuilder {
private int initialShardCount; private int initialShardCount;
private String project; private String project;
private List<MongoShardKey> shardKeys; private List<MongoShardKey> shardKeys;
private boolean useTsl;
private OcpMongoShardedClusterBuilder() { private OcpMongoShardedClusterBuilder() {
} }
@ -326,6 +344,11 @@ public OcpMongoShardedClusterBuilder withUseInternalAuth(boolean useInternalAuth
return this; return this;
} }
public OcpMongoShardedClusterBuilder withUseTsl(boolean useTsl) {
this.useTsl = useTsl;
return this;
}
public OcpMongoShardedClusterBuilder withOcp(OpenShiftClient ocp) { public OcpMongoShardedClusterBuilder withOcp(OpenShiftClient ocp) {
this.ocp = ocp; this.ocp = ocp;
return this; return this;
@ -347,7 +370,8 @@ public OcpMongoShardedClusterBuilder withShardKeys(List<MongoShardKey> shardKeys
} }
public OcpMongoShardedCluster build() { public OcpMongoShardedCluster build() {
return new OcpMongoShardedCluster(initialShardCount, replicaCount, configServerCount, rootUserName, rootPassword, useInternalAuth, ocp, project, shardKeys); return new OcpMongoShardedCluster(initialShardCount, replicaCount, configServerCount, rootUserName, rootPassword, useInternalAuth, useTsl, ocp, project,
shardKeys);
} }
} }
} }

View File

@ -23,7 +23,8 @@ public class OcpMongoShardedConstants {
public static final String ADMIN_DB = "admin"; public static final String ADMIN_DB = "admin";
public final static String INIT_RS_TEMPLATE = "init-rs.js"; public final static String INIT_RS_TEMPLATE = "init-rs.js";
public final static String CREATE_DBZ_USER_TEMPLATE = "create-dbz-user.js"; public final static String CREATE_CERT_USER_TEMPLATE = "create-dbz-user-x509.js";
public final static String CREATE_DBZ_USER_TEMPLATE = "create-dbz-user-x509.js";
public final static String INSERT_MONGOS_DATA_SCRIPT_LOC = "/database-resources/mongodb/sharded/insert-mongos-data.js"; public final static String INSERT_MONGOS_DATA_SCRIPT_LOC = "/database-resources/mongodb/sharded/insert-mongos-data.js";
public final static String KEYFILE_PATH_IN_CONTAINER = "/etc/mongodb.keyfile"; public final static String KEYFILE_PATH_IN_CONTAINER = "/etc/mongodb.keyfile";
} }

View File

@ -10,7 +10,6 @@
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -89,7 +88,7 @@ public void initialize() throws InterruptedException {
try { try {
// fill test data, create debezium user // fill test data, create debezium user
mongo.executeMongoSh(String.join("\n", Files.readAllLines(insertDataScript))); mongo.executeMongoSh(String.join("\n", Files.readAllLines(insertDataScript)));
mongo.executeMongoSh(MongoShardedUtil.createDebeziumUserCommand(ConfigProperties.DATABASE_MONGO_DBZ_USERNAME, ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD)); mongo.executeMongoSh(createDbzUserCommand());
} }
catch (IOException | TemplateException e) { catch (IOException | TemplateException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -98,8 +97,7 @@ public void initialize() throws InterruptedException {
// each shard has to have debezium user created for replica_set connection type // each shard has to have debezium user created for replica_set connection type
mongo.getShardReplicaSets().forEach(rs -> { mongo.getShardReplicaSets().forEach(rs -> {
try { try {
rs.executeMongosh(MongoShardedUtil.createDebeziumUserCommand(ConfigProperties.DATABASE_MONGO_DBZ_USERNAME, ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD), rs.executeMongosh(createDbzUserCommand(), false);
true);
} }
catch (IOException | TemplateException e) { catch (IOException | TemplateException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -107,15 +105,10 @@ public void initialize() throws InterruptedException {
}); });
} }
public void addShard(Map<MongoShardKey, ShardKeyRange> rangeMap) { public String createDbzUserCommand() throws TemplateException, IOException {
mongo.addShard(rangeMap); return mongo.getUseTls()
} ? MongoShardedUtil.createCertUserCommand(OcpMongoCertGenerator.CLIENT_CERT_SUBJECT)
: MongoShardedUtil.createPasswordUserCommand(ConfigProperties.DATABASE_MONGO_DBZ_USERNAME, ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD);
/**
* removes last shard
*/
public void removeShard() {
mongo.removeShard();
} }
public OcpMongoShardedCluster getMongo() { public OcpMongoShardedCluster getMongo() {

View File

@ -23,22 +23,23 @@ public class OcpMongoShardedDeployer implements Deployer<OcpMongoShardedControll
private final int configServerCount; private final int configServerCount;
private final String rootUserName; private final String rootUserName;
private final String rootPassword; private final String rootPassword;
private final boolean useInternalAuth; private final boolean useKeyfile;
private final boolean useTls;
private List<MongoShardKey> shardKeys; private List<MongoShardKey> shardKeys;
private OcpMongoShardedCluster mongo; private OcpMongoShardedCluster mongo;
public OcpMongoShardedDeployer(int shardCount, int replicaCount, int configServerCount, String rootUserName, String rootPassword, boolean useInternalAuth, public OcpMongoShardedDeployer(int shardCount, int replicaCount, int configServerCount, String rootUserName, String rootPassword, boolean useKeyfile,
OpenShiftClient ocp, OpenShiftClient ocp,
String project, List<MongoShardKey> shardKeys) { String project, boolean useTls, List<MongoShardKey> shardKeys) {
this.shardCount = shardCount; this.shardCount = shardCount;
this.replicaCount = replicaCount; this.replicaCount = replicaCount;
this.configServerCount = configServerCount; this.configServerCount = configServerCount;
this.rootUserName = rootUserName; this.rootUserName = rootUserName;
this.rootPassword = rootPassword; this.rootPassword = rootPassword;
this.useInternalAuth = useInternalAuth; this.useKeyfile = useKeyfile;
this.ocp = ocp; this.ocp = ocp;
this.project = project; this.project = project;
this.useTls = useTls;
this.shardKeys = shardKeys; this.shardKeys = shardKeys;
} }
@ -56,7 +57,8 @@ public OcpMongoShardedController deploy() throws Exception {
.withInitialShardCount(shardCount) .withInitialShardCount(shardCount)
.withReplicaCount(replicaCount) .withReplicaCount(replicaCount)
.withShardKeys(shardKeys) .withShardKeys(shardKeys)
.withUseInternalAuth(useInternalAuth) .withUseInternalAuth(useKeyfile)
.withUseTsl(useTls)
.withRootUser(rootUserName, rootPassword) .withRootUser(rootUserName, rootPassword)
.withShardKeys(shardKeys) .withShardKeys(shardKeys)
.build(); .build();
@ -76,8 +78,9 @@ public static final class OcpMongoShardedDeployerBuilder {
private int configServerCount; private int configServerCount;
private String rootUserName; private String rootUserName;
private String rootPassword; private String rootPassword;
private boolean useInternalAuth; private boolean useKeyfile;
private List<MongoShardKey> shardKeys; private List<MongoShardKey> shardKeys;
private boolean useTls;
private OcpMongoShardedDeployerBuilder() { private OcpMongoShardedDeployerBuilder() {
} }
@ -113,8 +116,13 @@ public OcpMongoShardedDeployerBuilder withRootUser(String rootUserName, String r
return this; return this;
} }
public OcpMongoShardedDeployerBuilder withUseInternalAuth(boolean useInternalAuth) { public OcpMongoShardedDeployerBuilder withUseKeyfile(boolean useKeyfile) {
this.useInternalAuth = useInternalAuth; this.useKeyfile = useKeyfile;
return this;
}
public OcpMongoShardedDeployerBuilder withUseTls(boolean useTls) {
this.useTls = useTls;
return this; return this;
} }
@ -124,7 +132,7 @@ public OcpMongoShardedDeployerBuilder withShardKeys(List<MongoShardKey> shardKey
} }
public OcpMongoShardedDeployer build() { public OcpMongoShardedDeployer build() {
return new OcpMongoShardedDeployer(shardCount, replicaCount, configServerCount, rootUserName, rootPassword, useInternalAuth, ocp, project, shardKeys); return new OcpMongoShardedDeployer(shardCount, replicaCount, configServerCount, rootUserName, rootPassword, useKeyfile, ocp, project, useTls, shardKeys);
} }
} }
} }

View File

@ -25,6 +25,7 @@
import io.fabric8.kubernetes.api.model.ServicePortBuilder; import io.fabric8.kubernetes.api.model.ServicePortBuilder;
import io.fabric8.kubernetes.api.model.ServiceSpecBuilder; import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;
import io.fabric8.kubernetes.api.model.VolumeBuilder; import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
@ -62,11 +63,15 @@ public static Deployment configServerDeployment(int num) {
.build()) .build())
.withSpec(new PodSpecBuilder() .withSpec(new PodSpecBuilder()
.withVolumes(new VolumeBuilder() .withVolumes(new VolumeBuilder()
.withName("volume-" + name)
.withEmptyDir(new EmptyDirVolumeSource()) .withEmptyDir(new EmptyDirVolumeSource())
.withName("volume-" + name)
.build()) .build())
.withContainers(new ContainerBuilder() .withContainers(new ContainerBuilder()
.withName("mongo") .withName("mongo")
.withVolumeMounts(new VolumeMountBuilder()
.withName("volume-" + name)
.withMountPath("/data/db")
.build())
.withReadinessProbe(new ProbeBuilder() .withReadinessProbe(new ProbeBuilder()
.withExec(new ExecActionBuilder() .withExec(new ExecActionBuilder()
.withCommand("mongosh", "localhost:" + OcpMongoShardedConstants.MONGO_CONFIG_PORT) .withCommand("mongosh", "localhost:" + OcpMongoShardedConstants.MONGO_CONFIG_PORT)

View File

@ -26,6 +26,7 @@
import io.fabric8.kubernetes.api.model.ServiceSpecBuilder; import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;
import io.fabric8.kubernetes.api.model.TCPSocketActionBuilder; import io.fabric8.kubernetes.api.model.TCPSocketActionBuilder;
import io.fabric8.kubernetes.api.model.VolumeBuilder; import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
@ -59,6 +60,10 @@ public static Deployment shardDeployment(int shardNum, int replicaNum) {
.build()) .build())
.withContainers(new ContainerBuilder() .withContainers(new ContainerBuilder()
.withName("mongo") .withName("mongo")
.withVolumeMounts(new VolumeMountBuilder()
.withName("volume-" + name)
.withMountPath("/data/db")
.build())
.withReadinessProbe(new ProbeBuilder() .withReadinessProbe(new ProbeBuilder()
.withExec(new ExecActionBuilder() .withExec(new ExecActionBuilder()
.withCommand("mongosh", "localhost:" + OcpMongoShardedConstants.MONGO_SHARD_PORT) .withCommand("mongosh", "localhost:" + OcpMongoShardedConstants.MONGO_SHARD_PORT)

View File

@ -74,7 +74,6 @@ public OcpKafkaConnectController deploy() throws InterruptedException {
ocp, ocp,
http); http);
controller.waitForCluster(); controller.waitForCluster();
return controller; return controller;
} }

View File

@ -15,16 +15,20 @@
import io.debezium.testing.system.tools.ConfigProperties; import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController; import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoCertGenerator;
import io.debezium.testing.system.tools.fabric8.FabricBuilderWrapper; import io.debezium.testing.system.tools.fabric8.FabricBuilderWrapper;
import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapKeySelector; import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
import io.fabric8.kubernetes.api.model.ConfigMapKeySelectorBuilder; import io.fabric8.kubernetes.api.model.ConfigMapKeySelectorBuilder;
import io.fabric8.kubernetes.api.model.ConfigMapVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.Secret;
import io.strimzi.api.kafka.model.common.CertSecretSourceBuilder; import io.strimzi.api.kafka.model.common.CertSecretSourceBuilder;
import io.strimzi.api.kafka.model.common.ClientTls; import io.strimzi.api.kafka.model.common.ClientTls;
import io.strimzi.api.kafka.model.common.ClientTlsBuilder; import io.strimzi.api.kafka.model.common.ClientTlsBuilder;
import io.strimzi.api.kafka.model.common.ContainerEnvVarBuilder; import io.strimzi.api.kafka.model.common.ContainerEnvVarBuilder;
import io.strimzi.api.kafka.model.common.template.ContainerTemplateBuilder; import io.strimzi.api.kafka.model.common.template.ContainerTemplateBuilder;
import io.strimzi.api.kafka.model.connect.ExternalConfigurationBuilder;
import io.strimzi.api.kafka.model.connect.ExternalConfigurationVolumeSourceBuilder;
import io.strimzi.api.kafka.model.connect.KafkaConnect; import io.strimzi.api.kafka.model.connect.KafkaConnect;
import io.strimzi.api.kafka.model.connect.KafkaConnectBuilder; import io.strimzi.api.kafka.model.connect.KafkaConnectBuilder;
import io.strimzi.api.kafka.model.connect.KafkaConnectTemplate; import io.strimzi.api.kafka.model.connect.KafkaConnectTemplate;
@ -180,6 +184,33 @@ public FabricKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap) {
} }
/**
* Mount truststore and keystore configmaps to external configuration path with same folder names as configmap names
* @return
*/
public FabricKafkaConnectBuilder withMongoCerts() {
builder
.editSpec()
.withExternalConfiguration(new ExternalConfigurationBuilder()
.withVolumes(new ExternalConfigurationVolumeSourceBuilder()
.withName(OcpMongoCertGenerator.KEYSTORE_CONFIGMAP)
.withConfigMap(new ConfigMapVolumeSourceBuilder()
.withName(OcpMongoCertGenerator.KEYSTORE_CONFIGMAP)
.withDefaultMode(0420)
.build())
.build(),
new ExternalConfigurationVolumeSourceBuilder()
.withName(OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP)
.withConfigMap(new ConfigMapVolumeSourceBuilder()
.withName(OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP)
.withDefaultMode(0420)
.build())
.build())
.build())
.endSpec();
return self();
}
public FabricKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap) { public FabricKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap) {
ConfigMapKeySelector configMapKeySelector = new ConfigMapKeySelectorBuilder() ConfigMapKeySelector configMapKeySelector = new ConfigMapKeySelectorBuilder()
.withKey("metrics") .withKey("metrics")

View File

@ -8,6 +8,7 @@
import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext;
import io.debezium.testing.system.resources.ConnectorFactories; import io.debezium.testing.system.resources.ConnectorFactories;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController; import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController; import io.debezium.testing.system.tools.kafka.KafkaConnectController;
@ -25,6 +26,12 @@ public ShardedMongoConnector(ExtensionContext.Store store) {
@Override @Override
public ConnectorConfigBuilder connectorConfig(String connectorName) { public ConnectorConfigBuilder connectorConfig(String connectorName) {
return new ConnectorFactories(kafkaController).shardedMongo(dbController, connectorName); if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
return new ConnectorFactories(kafkaController).shardedMongoWithTls(dbController, connectorName);
}
else {
return new ConnectorFactories(kafkaController).shardedMongo(dbController, connectorName);
}
} }
} }

View File

@ -35,7 +35,8 @@ protected OcpMongoShardedController databaseController() throws Exception {
.withShardCount(OcpMongoShardedConstants.SHARD_COUNT) .withShardCount(OcpMongoShardedConstants.SHARD_COUNT)
.withReplicaCount(OcpMongoShardedConstants.REPLICAS_IN_SHARD) .withReplicaCount(OcpMongoShardedConstants.REPLICAS_IN_SHARD)
.withShardKeys(getTestShardKeys()) .withShardKeys(getTestShardKeys())
.withUseInternalAuth(ConfigProperties.DATABASE_MONGO_USE_KEYFILE) .withUseKeyfile(ConfigProperties.DATABASE_MONGO_USE_KEYFILE)
.withUseTls(ConfigProperties.DATABASE_MONGO_USE_TLS)
.withRootUser(ConfigProperties.DATABASE_MONGO_USERNAME, ConfigProperties.DATABASE_MONGO_SA_PASSWORD) .withRootUser(ConfigProperties.DATABASE_MONGO_USERNAME, ConfigProperties.DATABASE_MONGO_SA_PASSWORD)
.build(); .build();
controller = deployer.deploy(); controller = deployer.deploy();

View File

@ -18,6 +18,7 @@
import io.debezium.testing.system.tools.YAML; import io.debezium.testing.system.tools.YAML;
import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController; import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController;
import io.debezium.testing.system.tools.artifacts.OcpArtifactServerDeployer; import io.debezium.testing.system.tools.artifacts.OcpArtifactServerDeployer;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoCertGenerator;
import io.debezium.testing.system.tools.kafka.KafkaConnectController; import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController; import io.debezium.testing.system.tools.kafka.KafkaController;
import io.debezium.testing.system.tools.kafka.OcpKafkaConnectController; import io.debezium.testing.system.tools.kafka.OcpKafkaConnectController;
@ -99,6 +100,10 @@ private void deployConnectCluster(StrimziOperatorController operatorController,
.withConnectorResources(STRIMZI_OPERATOR_CONNECTORS) .withConnectorResources(STRIMZI_OPERATOR_CONNECTORS)
.withBuild(artifactServerController) .withBuild(artifactServerController)
.withPullSecret(operatorController.getPullSecret()); .withPullSecret(operatorController.getPullSecret());
if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
OcpMongoCertGenerator.generateMongoTestCerts(ocp);
builder.withMongoCerts();
}
OcpKafkaConnectDeployer connectDeployer = new OcpKafkaConnectDeployer( OcpKafkaConnectDeployer connectDeployer = new OcpKafkaConnectDeployer(
project, builder, configMap, operatorController, ocp, new OkHttpClient()); project, builder, configMap, operatorController, ocp, new OkHttpClient());

View File

@ -104,7 +104,7 @@ private void addServiceAccountToClusterRoleBinding(String saNamespace, ClusterRo
.withName("default") .withName("default")
.withNamespace(saNamespace) .withNamespace(saNamespace)
.build()); .build());
bindingBuilder.addToUserNames("system:serviceaccount:" + saNamespace + ":default"); bindingBuilder.withUserNames("system:serviceaccount:" + saNamespace + ":default");
} }
/** /**

View File

@ -8,8 +8,10 @@
import java.util.Random; import java.util.Random;
import io.debezium.testing.system.tools.ConfigProperties; import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.certificateutil.CertUtil;
import io.debezium.testing.system.tools.databases.SqlDatabaseController; import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController; import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoCertGenerator;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaController; import io.debezium.testing.system.tools.kafka.KafkaController;
@ -104,10 +106,29 @@ public ConnectorConfigBuilder shardedMongo(MongoDatabaseController controller, S
.put("topic.prefix", connectorName) .put("topic.prefix", connectorName)
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector") .put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.put("task.max", 1) .put("task.max", 1)
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD)
.put("mongodb.connection.string", controller.getPublicDatabaseUrl()) .put("mongodb.connection.string", controller.getPublicDatabaseUrl())
.put("mongodb.connection.mode", "sharded") .put("mongodb.connection.mode", "sharded")
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD)
.addOperationRouterForTable("u", "customers");
return cb;
}
public ConnectorConfigBuilder shardedMongoWithTls(MongoDatabaseController controller, String connectorName) {
ConnectorConfigBuilder cb = new ConnectorConfigBuilder(connectorName);
cb
.put("topic.prefix", connectorName)
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.put("task.max", 1)
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
.put("mongodb.connection.mode", "sharded")
.put("mongodb.ssl.enabled", true)
.put("mongodb.ssl.keystore",
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.KEYSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.KEYSTORE_SUBPATH)
.put("mongodb.ssl.keystore.password", CertUtil.KEYSTORE_PASSWORD)
.put("mongodb.ssl.truststore",
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.TRUSTSTORE_SUBPATH)
.put("mongodb.ssl.truststore.password", CertUtil.KEYSTORE_PASSWORD)
.addOperationRouterForTable("u", "customers"); .addOperationRouterForTable("u", "customers");
return cb; return cb;
} }

View File

@ -20,10 +20,8 @@
import io.debezium.testing.system.assertions.KafkaAssertions; import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest; import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseClient; import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseClient;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController; import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import io.debezium.testing.system.tools.databases.mongodb.sharded.MongoShardedUtil;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController; import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController;
import io.debezium.testing.system.tools.databases.mongodb.sharded.ShardKeyRange; import io.debezium.testing.system.tools.databases.mongodb.sharded.ShardKeyRange;
import io.debezium.testing.system.tools.databases.mongodb.sharded.componentproviders.OcpShardModelProvider; import io.debezium.testing.system.tools.databases.mongodb.sharded.componentproviders.OcpShardModelProvider;
@ -94,10 +92,9 @@ protected void addAndRemoveShardTest(OcpMongoShardedController dbController, Str
// add shard, restart connector, insert to that shard and verify that insert was captured by debezium // add shard, restart connector, insert to that shard and verify that insert was captured by debezium
var key = dbController.getMongo().getShardKey("inventory.customers"); var key = dbController.getMongo().getShardKey("inventory.customers");
var keyRange = new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(3), "1100", "1105"); var keyRange = new ShardKeyRange(OcpShardModelProvider.getShardReplicaSetName(3), "1100", "1105");
dbController.addShard(Map.of(key, keyRange)); dbController.getMongo().addShard(Map.of(key, keyRange));
var sets = dbController.getMongo().getShardReplicaSets(); var sets = dbController.getMongo().getShardReplicaSets();
sets.get(sets.size() - 1).executeMongosh( sets.get(sets.size() - 1).executeMongosh(dbController.createDbzUserCommand(), true);
MongoShardedUtil.createDebeziumUserCommand(ConfigProperties.DATABASE_MONGO_DBZ_USERNAME, ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD), true);
connectController.undeployConnector(connectorName); connectController.undeployConnector(connectorName);
connectController.deployConnector(connectorConfig); connectController.deployConnector(connectorConfig);
@ -108,7 +105,7 @@ protected void addAndRemoveShardTest(OcpMongoShardedController dbController, Str
// remove shard, restart connector and verify debezium is still streaming // remove shard, restart connector and verify debezium is still streaming
removeCustomer(dbController, "ffoo@test.com"); removeCustomer(dbController, "ffoo@test.com");
dbController.removeShard(); dbController.getMongo().removeShard();
connectController.undeployConnector(connectorName); connectController.undeployConnector(connectorName);
connectController.deployConnector(connectorConfig); connectController.deployConnector(connectorConfig);

View File

@ -0,0 +1,29 @@
// CREATE DBZ USER
db = db.getSiblingDB('admin');
db.runCommand({
createRole: "listDatabases_x509",
privileges: [
{ resource: { cluster : true }, actions: ["listDatabases"]}
],
roles: []
});
db.runCommand({
createRole: "readChangeStream_x509",
privileges: [
{ resource: { db: "", collection: ""}, actions: [ "find", "changeStream" ] }
],
roles: []
});
db = db.getSiblingDB('$external');
db.runCommand({
createUser: '${userName}',
roles: [
{ role: "listDatabases_x509", db: "admin" },
{ role: "readChangeStream_x509", db: "admin" },
{ role: "readAnyDatabase", db: "admin" },
{ role: "read", db: "config", collection: "shards" },
{ role: "read", db: "local"}
],
writeConcern: { w: 'majority' , wtimeout: 5000 }
});

View File

@ -21,7 +21,16 @@ spec:
name: testsuite-volume name: testsuite-volume
env: env:
- name: TESTSUITE_ARGUMENTS - name: TESTSUITE_ARGUMENTS
value: -Dtest.prepare.strimzi=true -DskipTests=true -Dtest.strimzi.version.kafka=3.5.0 -Dtest.wait.scale=1 -Dtest.strimzi.kc.build=true -Dtest.ocp.project.debezium=debezium-test -Dimage.as=AS_IMAGE -Dtest.strimzi.operator.channel=stable -Dgroups="!avro&openshift" value: >
-Dtest.prepare.strimzi=true
-DskipTests=true
-Dtest.strimzi.version.kafka=3.5.0
-Dtest.wait.scale=1
-Dtest.strimzi.kc.build=true
-Dtest.ocp.project.debezium=debezium-test
-Dimage.as=AS_IMAGE
-Dtest.strimzi.operator.channel=stable
-Dgroups="!avro&openshift"
- name: DBZ_GIT_BRANCH - name: DBZ_GIT_BRANCH
value: "main" value: "main"
- name: DBZ_GIT_REPOSITORY - name: DBZ_GIT_REPOSITORY