DBZ-7605 refactor

This commit is contained in:
Martin Medek 2024-04-10 14:49:43 +02:00 committed by Ondrej Babec
parent b0f663c4c6
commit 19a80fb8cc
8 changed files with 44 additions and 82 deletions

View File

@ -31,10 +31,10 @@ public class CertUtil {
/**
* Uploads a string data as configMap to ocp cluster
* @param project namespace, where to create the configmap
* @param project namespace, where to create the configMap
* @param data content of a file in configMap
* @param configMapName config map name
* @param fileNameInConfigMap filename in configmap
* @param fileNameInConfigMap filename in configMap
* @param ocp ocp client
*/
public static void stringToConfigMap(String project, String data, String configMapName, String fileNameInConfigMap, OpenShiftClient ocp) {
@ -49,10 +49,10 @@ public static void stringToConfigMap(String project, String data, String configM
/**
* Converts keystore to base64 string and uploads as configMap to ocp cluster
* @param project namespace, where to create the configmap
* @param keyStore keystore object to be saved in configmap
* @param configMapName config map name
* @param fileNameInConfigMap filename in configmap
* @param project namespace, where to create the configMap
* @param keyStore keystore object to be saved in configMap
* @param configMapName configMap name
* @param fileNameInConfigMap filename in configMap
* @param ocp ocp client
*/
public static void keystoreToConfigMap(String project, KeyStore keyStore, String configMapName, String fileNameInConfigMap, OpenShiftClient ocp)
@ -85,6 +85,13 @@ public static String exportToBase64PEMString(PrivateKey privateKey) throws IOExc
return sw.toString();
}
/**
* exports certificate from holder to base64 string
* @param holder
* @return
* @throws CertificateException
* @throws IOException
*/
public static String exportToBase64PEMString(X509CertificateHolder holder) throws CertificateException, IOException {
return exportToBase64PEMString(convertHolderToCert(holder));
}

View File

@ -7,10 +7,10 @@
import java.io.IOException;
import java.math.BigInteger;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.Security;
import java.security.cert.CertificateException;
@ -55,7 +55,7 @@ public CertificateGenerator(List<CertificateWrapperBuilder> leafSpec) {
}
/**
* generate CA certificate and all leaf certificates specified in certSpecs attribute
* Generates CA certificate and all leaf certificates specified in {@link CertificateGenerator#certSpecs} attribute
* @throws Exception
*/
public void generate() throws Exception {
@ -75,7 +75,7 @@ public void generate() throws Exception {
/**
* Generates keystore containing a leaf private key and its certificate chain. Usable for keystore/truststore generation
* keystore password set by constant in CertUtil class
* keystore password set by constant in {@link CertUtil} class
* @param leafName
* @return
* @throws Exception
@ -104,7 +104,7 @@ public CertificateWrapper getCa() {
return ca;
}
private CertificateWrapper generateCa() throws IOException {
private CertificateWrapper generateCa() throws IOException, NoSuchAlgorithmException {
Security.addProvider(new BouncyCastleProvider());
KeyPair keyPair = generateKeyPair();
@ -148,7 +148,7 @@ private CertificateWrapper generateCa() throws IOException {
.build();
}
private CertificateWrapper genLeafCert(CertificateWrapperBuilder builder) throws OperatorCreationException {
private CertificateWrapper genLeafCert(CertificateWrapperBuilder builder) throws OperatorCreationException, NoSuchAlgorithmException {
KeyPair keyPair = generateKeyPair();
long notBefore = System.currentTimeMillis();
@ -186,15 +186,10 @@ private X509Certificate holderToCert(X509CertificateHolder holder) throws Certif
return converter.getCertificate(holder);
}
private KeyPair generateKeyPair() {
try {
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance(PRIVATE_KEY_ALGORITHM);
keyPairGenerator.initialize(PRIVATE_KEY_SIZE, new SecureRandom());
return keyPairGenerator.generateKeyPair();
}
catch (GeneralSecurityException var2) {
throw new AssertionError(var2);
}
private KeyPair generateKeyPair() throws NoSuchAlgorithmException {
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance(PRIVATE_KEY_ALGORITHM);
keyPairGenerator.initialize(PRIVATE_KEY_SIZE, new SecureRandom());
return keyPairGenerator.generateKeyPair();
}
}

View File

@ -95,7 +95,7 @@ public static void addKeyFileToDeployment(Deployment deployment) {
}
/**
* Modify the mongodb deployment object to mount server and ca certificates (from configmap, name is set by constant) and use them
* Modify the mongodb deployment object to mount server and ca certificates (from configMap, name is set by constant) and use them
* in mongodb deployment for internal and client authentication.
* Mutually exclusive with using keyfile for internal auth
* @param deployment

View File

@ -122,22 +122,22 @@ public ConnectorConfigBuilder addOperationRouterForTable(String op, String table
return addOperationRouter(op, targetTopicName, serverName + ".*\\." + tableName);
}
public ConnectorConfigBuilder addMongoTlsParams() {
this
.put("mongodb.ssl.enabled", true)
.put("mongodb.ssl.keystore",
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.KEYSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.KEYSTORE_SUBPATH)
.put("mongodb.ssl.keystore.password", CertUtil.KEYSTORE_PASSWORD)
.put("mongodb.ssl.truststore",
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.TRUSTSTORE_SUBPATH)
.put("mongodb.ssl.truststore.password", CertUtil.KEYSTORE_PASSWORD);
return this;
}
public ConnectorConfigBuilder addMongoPasswordAuthParams() {
this
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD);
public ConnectorConfigBuilder addMongoDbzUser() {
if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
this
.put("mongodb.ssl.enabled", true)
.put("mongodb.ssl.keystore",
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.KEYSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.KEYSTORE_SUBPATH)
.put("mongodb.ssl.keystore.password", CertUtil.KEYSTORE_PASSWORD)
.put("mongodb.ssl.truststore",
"/opt/kafka/external-configuration/" + OcpMongoCertGenerator.TRUSTSTORE_CONFIGMAP + "/" + OcpMongoCertGenerator.TRUSTSTORE_SUBPATH)
.put("mongodb.ssl.truststore.password", CertUtil.KEYSTORE_PASSWORD);
}
else {
this
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD);
}
return this;
}

View File

@ -185,7 +185,7 @@ public FabricKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap) {
}
/**
* Mount truststore and keystore configmaps to external configuration path with same folder names as configmap names
* Mount truststore and keystore configMaps to external configuration path with same folder names as configMap names
* @return
*/
public FabricKafkaConnectBuilder withMongoCerts() {

View File

@ -8,7 +8,6 @@
import org.junit.jupiter.api.extension.ExtensionContext;
import io.debezium.testing.system.resources.ConnectorFactories;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
@ -26,11 +25,6 @@ public ShardedMongoConnector(ExtensionContext.Store store) {
@Override
public ConnectorConfigBuilder connectorConfig(String connectorName) {
if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
return new ConnectorFactories(kafkaController).shardedMongoWithTls(dbController, connectorName);
}
else {
return new ConnectorFactories(kafkaController).shardedMongo(dbController, connectorName);
}
return new ConnectorFactories(kafkaController).shardedMongo(dbController, connectorName);
}
}

View File

@ -8,7 +8,6 @@
import org.junit.jupiter.api.extension.ExtensionContext;
import io.debezium.testing.system.resources.ConnectorFactories;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.mongodb.sharded.OcpMongoShardedController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
@ -26,11 +25,6 @@ public ShardedReplicaMongoConnector(ExtensionContext.Store store) {
@Override
public ConnectorConfigBuilder connectorConfig(String connectorName) {
if (ConfigProperties.DATABASE_MONGO_USE_TLS) {
return new ConnectorFactories(kafkaController).shardedReplicaMongoWithTls(dbController, connectorName);
}
else {
return new ConnectorFactories(kafkaController).shardedReplicaMongo(dbController, connectorName);
}
return new ConnectorFactories(kafkaController).shardedReplicaMongo(dbController, connectorName);
}
}

View File

@ -106,20 +106,7 @@ public ConnectorConfigBuilder shardedMongo(MongoDatabaseController controller, S
.put("task.max", 1)
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
.put("mongodb.connection.mode", "sharded")
.addMongoPasswordAuthParams()
.addOperationRouterForTable("u", "customers");
return cb;
}
public ConnectorConfigBuilder shardedMongoWithTls(MongoDatabaseController controller, String connectorName) {
ConnectorConfigBuilder cb = new ConnectorConfigBuilder(connectorName);
cb
.put("topic.prefix", connectorName)
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.put("task.max", 1)
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
.put("mongodb.connection.mode", "sharded")
.addMongoTlsParams()
.addMongoDbzUser()
.addOperationRouterForTable("u", "customers");
return cb;
}
@ -134,22 +121,7 @@ public ConnectorConfigBuilder shardedReplicaMongo(MongoDatabaseController contro
.put("task.max", 4)
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
.put("mongodb.connection.mode", "replica_set")
.addMongoPasswordAuthParams()
.addOperationRouterForTable("u", "customers");
return cb;
}
public ConnectorConfigBuilder shardedReplicaMongoWithTls(MongoDatabaseController controller, String connectorName) {
// String connectionUrl =;
ConnectorConfigBuilder cb = new ConnectorConfigBuilder(connectorName);
cb
.put("topic.prefix", connectorName)
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.put("task.max", 4)
.put("mongodb.connection.string", controller.getPublicDatabaseUrl())
.put("mongodb.connection.mode", "replica_set")
.addMongoTlsParams()
.addMongoDbzUser()
.addOperationRouterForTable("u", "customers");
return cb;
}