DBZ-7223 Add the MongoDB sink connector

* remove yet unused, commented code
* move MongoDbSinkConnector class to root level package
* address PR feedback about the container NETWORK change in MongoDbDatabaseProvider and possible side-effects

closes https://issues.redhat.com/browse/DBZ-7223
This commit is contained in:
rkerner 2024-07-03 19:50:11 +02:00 committed by Jiri Pechanec
parent f72339d93f
commit 143e8c3208
6 changed files with 26 additions and 168 deletions

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.List;
@ -16,6 +16,10 @@
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.sink.Module;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorTask;
import io.debezium.connector.mongodb.sink.SinkConnection;
public class MongoDbSinkConnector extends SinkConnector {
@ -62,6 +66,7 @@ public Config validate(Map<String, String> connectorConfigs) {
MongoDbSinkConnectorConfig sinkConfig;
try {
sinkConfig = new MongoDbSinkConnectorConfig(Configuration.from(connectorConfigs));
sinkConfig.validate();
}
catch (Exception e) {
return config;

View File

@ -90,36 +90,13 @@ public class MongoDbSinkConnectorConfig implements SharedMongoDbConnectorConfig,
.withDescription("Specifies how many records to attempt to batch together into the destination table, when possible. " +
"You can also configure the connectors underlying consumers max.poll.records using consumer.override.max.poll.records in the connector configuration.");
// public static final Field FIELD_INCLUDE_LIST_FIELD = Field.create(FIELD_INCLUDE_LIST)
// .withDisplayName("Include Fields")
// .withType(ConfigDef.Type.LIST)
// .withGroup(Field.createGroupEntry(Field.Group.FILTERS, 1))
// .withWidth(ConfigDef.Width.LONG)
// .withImportance(ConfigDef.Importance.MEDIUM)
// .withValidation(Field::isListOfRegex)
// .withDescription("A comma-separated list of regular expressions matching fully-qualified names of fields that "
// + "should be included in change events. The field names must be delimited by the format <topic>:<field> ");
// public static final Field FIELD_EXCLUDE_LIST_FIELD = Field.create(FIELD_EXCLUDE_LIST)
// .withDisplayName("Exclude Fields")
// .withType(ConfigDef.Type.LIST)
// .withGroup(Field.createGroupEntry(Field.Group.FILTERS, 2))
// .withWidth(ConfigDef.Width.LONG)
// .withImportance(ConfigDef.Importance.MEDIUM)
// .withValidation(Field::isListOfRegex)
// .withDescription("A comma-separated list of regular expressions matching fully-qualified names of fields that "
// + "should be excluded from change events. The field names must be delimited by the format <topic>:<field> ");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
SINK_DATABASE_NAME,
CONNECTION_STRING,
TABLE_NAMING_STRATEGY_FIELD,
COLUMN_NAMING_STRATEGY_FIELD,
BATCH_SIZE_FIELD
// FIELD_INCLUDE_LIST_FIELD,
// FIELD_EXCLUDE_LIST_FIELD
)
BATCH_SIZE_FIELD)
.create();
/**
@ -141,7 +118,6 @@ public WriteModelStrategy getDeleteWriteModelStrategy() {
private final String sinkDatabaseName;
private final String tableNameFormat;
// private final Set<String> dataTypeMapping;
private final TableNamingStrategy tableNamingStrategy;
private final ColumnNamingStrategy columnNamingStrategy;
private FieldFilterFactory.FieldNameFilter fieldsFilter;
@ -186,7 +162,7 @@ public boolean validateAndRecord(Iterable<Field> fields, Consumer<String> proble
return config.validateAndRecord(fields, problems);
}
protected static ConfigDef configDef() {
public static ConfigDef configDef() {
return CONFIG_DEFINITION.configDef();
}
@ -194,9 +170,6 @@ public int getBatchSize() {
return batchSize;
}
// public Set<String> getDataTypeMapping() {
// return dataTypeMapping;
// }
public TableNamingStrategy getTableNamingStrategy() {
return tableNamingStrategy;
}
@ -205,10 +178,6 @@ public ColumnNamingStrategy getColumnNamingStrategy() {
return columnNamingStrategy;
}
// public FieldFilterFactory.FieldNameFilter getFieldsFilter() {
// return fieldsFilter;
// }
public String getContextName() {
return Module.contextName();
}

View File

@ -79,12 +79,7 @@ private <T> Optional<T> tryProcess(final Supplier<Optional<T>> supplier) {
}
catch (Exception e) {
exception = e;
// if (config.logErrors()) {
LOGGER.error("Unable to process record {}", sinkRecord, e);
// }
// if (!config.tolerateErrors()) {
// throw e;
// }
}
return Optional.empty();
}

View File

@ -28,9 +28,6 @@
public final class SinkConnection {
// private static final String USERS_INFO = "{usersInfo: '%s', showPrivileges: true}";
// private static final String ROLES_INFO = "{rolesInfo: '%s', showPrivileges: true, showBuiltinRoles: true}";
public static Optional<MongoClient> canConnect(final Config config, final Field connectionStringConfigName) {
Optional<ConfigValue> optionalConnectionString = getConfigByName(config, connectionStringConfigName.name());
if (optionalConnectionString.isPresent() && optionalConnectionString.get().errorMessages().isEmpty()) {
@ -88,117 +85,6 @@ public void clusterDescriptionChanged(
return Optional.empty();
}
// public static void checkUserHasActions(
// final MongoClient client,
// final MongoCredential credential,
// final List<String> actions,
// final String dbName,
// final String collectionName,
// final String configName,
// final Config config) {
//
// if (credential == null) {
// return;
// }
//
// try {
// Document usersInfo = client
// .getDatabase(credential.getSource())
// .runCommand(Document.parse(format(USERS_INFO, credential.getUserName())));
//
// List<String> unsupportedActions = new ArrayList<>(actions);
// for (final Document userInfo : usersInfo.getList("users", Document.class)) {
// unsupportedActions = removeUserActions(
// userInfo, credential.getSource(), dbName, collectionName, actions);
//
// if (!unsupportedActions.isEmpty()
// && userInfo.getList("inheritedPrivileges", Document.class, emptyList()).isEmpty()) {
// for (final Document inheritedRole : userInfo.getList("inheritedRoles", Document.class, emptyList())) {
// Document rolesInfo = client
// .getDatabase(inheritedRole.getString("db"))
// .runCommand(
// Document.parse(format(ROLES_INFO, inheritedRole.getString("role"))));
// for (final Document roleInfo : rolesInfo.getList("roles", Document.class, emptyList())) {
// unsupportedActions = removeUserActions(
// roleInfo,
// credential.getSource(),
// dbName,
// collectionName,
// unsupportedActions);
// }
//
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
// }
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
//
// String missingPermissions = String.join(", ", unsupportedActions);
// getConfigByName(config, configName)
// .ifPresent(
// c -> c.addErrorMessage(
// format(
// "Invalid user permissions. Missing the following action permissions: %s",
// missingPermissions)));
// }
// catch (MongoSecurityException e) {
// getConfigByName(config, configName)
// .ifPresent(c -> c.addErrorMessage("Invalid user permissions authentication failed."));
// }
// catch (Exception e) {
// LOGGER.warn("Permission validation failed due to: {}", e.getMessage(), e);
// }
// }
/**
* Checks the roles info document for matching actions and removes them from the provided list
*/
// private static List<String> removeUserActions(final Document rolesInfo, final String authDatabase, final String databaseName, final String collectionName,
// final List<String> userActions) {
// List<Document> privileges = rolesInfo.getList("inheritedPrivileges", Document.class, emptyList());
// if (privileges.isEmpty() || userActions.isEmpty()) {
// return userActions;
// }
//
// List<String> unsupportedUserActions = new ArrayList<>(userActions);
// for (final Document privilege : privileges) {
// Document resource = privilege.get("resource", new Document());
// if (resource.containsKey("cluster") && resource.getBoolean("cluster")) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// else if (resource.containsKey("db") && resource.containsKey("collection")) {
// String database = resource.getString("db");
// String collection = resource.getString("collection");
//
// boolean resourceMatches = false;
// boolean collectionMatches = collection.isEmpty() || collection.equals(collectionName);
// if (database.isEmpty() && collectionMatches) {
// resourceMatches = true;
// }
// else if (database.equals(authDatabase) && collection.isEmpty()) {
// resourceMatches = true;
// }
// else if (database.equals(databaseName) && collectionMatches) {
// resourceMatches = true;
// }
//
// if (resourceMatches) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// }
//
// if (unsupportedUserActions.isEmpty()) {
// break;
// }
// }
//
// return unsupportedUserActions;
// }
public static Optional<ConfigValue> getConfigByName(final Config config, final String name) {
for (final ConfigValue configValue : config.configValues()) {
if (configValue.name().equals(name)) {

View File

@ -16,8 +16,6 @@
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,7 +23,6 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnector;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
@ -115,8 +112,8 @@ default void checkSinkConnectorWritesRecords() {
.until(() -> listCollections(DATABASE_NAME).size() >= 6);
List<String> collections = listCollections(DATABASE_NAME);
LOGGER.debug("List collections: {}", Arrays.toString(collections.toArray()));
Assertions.assertThat(listCollections(DATABASE_NAME).size()).isEqualTo(6);
MatcherAssert.assertThat(collections, CoreMatchers.hasItems("dbserver1_inventory_addresses", "dbserver1_inventory_orders", "dbserver1_inventory_customers",
"dbserver1_inventory_products_on_hand", "dbserver1_inventory_geom", "dbserver1_inventory_products"));
Assertions.assertThat(listCollections(DATABASE_NAME)).hasSize(6);
Assertions.assertThat(collections).containsExactlyInAnyOrder("dbserver1_inventory_addresses", "dbserver1_inventory_orders", "dbserver1_inventory_customers",
"dbserver1_inventory_products_on_hand", "dbserver1_inventory_geom", "dbserver1_inventory_products");
}
}

View File

@ -40,11 +40,13 @@ private static MongoDbReplicaSet.Builder dockerReplicaSetBuilder() {
* @return MongoDb Replica set
*/
public static MongoDbReplicaSet dockerReplicaSet() {
var replicaSet = dockerReplicaSetBuilder();
var replicaSetBuilder = dockerReplicaSetBuilder();
if (NETWORK != null) {
replicaSet.network(NETWORK);
replicaSetBuilder.network(NETWORK);
}
return replicaSet.build();
var replicaSet = replicaSetBuilder.build();
NETWORK = null;
return replicaSet;
}
/**
@ -53,11 +55,13 @@ public static MongoDbReplicaSet dockerReplicaSet() {
* @return MongoDb Replica set
*/
public static MongoDbReplicaSet dockerAuthReplicaSet() {
var replicaSet = dockerReplicaSetBuilder().authEnabled(true);
var replicaSetBuilder = dockerReplicaSetBuilder().authEnabled(true);
if (NETWORK != null) {
replicaSet.network(NETWORK);
replicaSetBuilder.network(NETWORK);
}
return replicaSet.build();
var replicaSet = replicaSetBuilder.build();
NETWORK = null;
return replicaSet;
}
/**
@ -79,11 +83,13 @@ private static MongoDbShardedCluster.Builder mongoDbShardedClusterBuilder() {
}
public static MongoDbShardedCluster mongoDbShardedCluster() {
var cluster = mongoDbShardedClusterBuilder();
var clusterBuilder = mongoDbShardedClusterBuilder();
if (NETWORK != null) {
cluster.network(NETWORK);
clusterBuilder.network(NETWORK);
}
return cluster.build();
var shardedCluster = clusterBuilder.build();
NETWORK = null;
return shardedCluster;
}
public static MongoDbShardedCluster mongoDbShardedCluster(Network network) {