DBZ-7223 Add the MongoDB sink connector

* address PR feedback

closes https://issues.redhat.com/browse/DBZ-7223
This commit is contained in:
rkerner 2024-07-03 14:44:43 +02:00 committed by Jiri Pechanec
parent 56e63b98a8
commit 36710c43ae
5 changed files with 115 additions and 132 deletions

View File

@ -16,8 +16,8 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkRecordConverter;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.connector.mongodb.sink.converters.SinkRecordConverter;
import io.debezium.connector.mongodb.sink.eventhandler.relational.RelationalEventHandler;
public class MongoProcessedSinkRecordData {

View File

@ -28,8 +28,8 @@
public final class SinkConnection {
// private static final String USERS_INFO = "{usersInfo: '%s', showPrivileges: true}";
// private static final String ROLES_INFO = "{rolesInfo: '%s', showPrivileges: true, showBuiltinRoles: true}";
// private static final String USERS_INFO = "{usersInfo: '%s', showPrivileges: true}";
// private static final String ROLES_INFO = "{rolesInfo: '%s', showPrivileges: true, showBuiltinRoles: true}";
public static Optional<MongoClient> canConnect(final Config config, final Field connectionStringConfigName) {
Optional<ConfigValue> optionalConnectionString = getConfigByName(config, connectionStringConfigName.name());
@ -88,116 +88,116 @@ public void clusterDescriptionChanged(
return Optional.empty();
}
// public static void checkUserHasActions(
// final MongoClient client,
// final MongoCredential credential,
// final List<String> actions,
// final String dbName,
// final String collectionName,
// final String configName,
// final Config config) {
//
// if (credential == null) {
// return;
// }
//
// try {
// Document usersInfo = client
// .getDatabase(credential.getSource())
// .runCommand(Document.parse(format(USERS_INFO, credential.getUserName())));
//
// List<String> unsupportedActions = new ArrayList<>(actions);
// for (final Document userInfo : usersInfo.getList("users", Document.class)) {
// unsupportedActions = removeUserActions(
// userInfo, credential.getSource(), dbName, collectionName, actions);
//
// if (!unsupportedActions.isEmpty()
// && userInfo.getList("inheritedPrivileges", Document.class, emptyList()).isEmpty()) {
// for (final Document inheritedRole : userInfo.getList("inheritedRoles", Document.class, emptyList())) {
// Document rolesInfo = client
// .getDatabase(inheritedRole.getString("db"))
// .runCommand(
// Document.parse(format(ROLES_INFO, inheritedRole.getString("role"))));
// for (final Document roleInfo : rolesInfo.getList("roles", Document.class, emptyList())) {
// unsupportedActions = removeUserActions(
// roleInfo,
// credential.getSource(),
// dbName,
// collectionName,
// unsupportedActions);
// }
//
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
// }
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
//
// String missingPermissions = String.join(", ", unsupportedActions);
// getConfigByName(config, configName)
// .ifPresent(
// c -> c.addErrorMessage(
// format(
// "Invalid user permissions. Missing the following action permissions: %s",
// missingPermissions)));
// }
// catch (MongoSecurityException e) {
// getConfigByName(config, configName)
// .ifPresent(c -> c.addErrorMessage("Invalid user permissions authentication failed."));
// }
// catch (Exception e) {
// LOGGER.warn("Permission validation failed due to: {}", e.getMessage(), e);
// }
// }
// public static void checkUserHasActions(
// final MongoClient client,
// final MongoCredential credential,
// final List<String> actions,
// final String dbName,
// final String collectionName,
// final String configName,
// final Config config) {
//
// if (credential == null) {
// return;
// }
//
// try {
// Document usersInfo = client
// .getDatabase(credential.getSource())
// .runCommand(Document.parse(format(USERS_INFO, credential.getUserName())));
//
// List<String> unsupportedActions = new ArrayList<>(actions);
// for (final Document userInfo : usersInfo.getList("users", Document.class)) {
// unsupportedActions = removeUserActions(
// userInfo, credential.getSource(), dbName, collectionName, actions);
//
// if (!unsupportedActions.isEmpty()
// && userInfo.getList("inheritedPrivileges", Document.class, emptyList()).isEmpty()) {
// for (final Document inheritedRole : userInfo.getList("inheritedRoles", Document.class, emptyList())) {
// Document rolesInfo = client
// .getDatabase(inheritedRole.getString("db"))
// .runCommand(
// Document.parse(format(ROLES_INFO, inheritedRole.getString("role"))));
// for (final Document roleInfo : rolesInfo.getList("roles", Document.class, emptyList())) {
// unsupportedActions = removeUserActions(
// roleInfo,
// credential.getSource(),
// dbName,
// collectionName,
// unsupportedActions);
// }
//
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
// }
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
//
// String missingPermissions = String.join(", ", unsupportedActions);
// getConfigByName(config, configName)
// .ifPresent(
// c -> c.addErrorMessage(
// format(
// "Invalid user permissions. Missing the following action permissions: %s",
// missingPermissions)));
// }
// catch (MongoSecurityException e) {
// getConfigByName(config, configName)
// .ifPresent(c -> c.addErrorMessage("Invalid user permissions authentication failed."));
// }
// catch (Exception e) {
// LOGGER.warn("Permission validation failed due to: {}", e.getMessage(), e);
// }
// }
/**
* Checks the roles info document for matching actions and removes them from the provided list
*/
// private static List<String> removeUserActions(final Document rolesInfo, final String authDatabase, final String databaseName, final String collectionName,
// final List<String> userActions) {
// List<Document> privileges = rolesInfo.getList("inheritedPrivileges", Document.class, emptyList());
// if (privileges.isEmpty() || userActions.isEmpty()) {
// return userActions;
// }
//
// List<String> unsupportedUserActions = new ArrayList<>(userActions);
// for (final Document privilege : privileges) {
// Document resource = privilege.get("resource", new Document());
// if (resource.containsKey("cluster") && resource.getBoolean("cluster")) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// else if (resource.containsKey("db") && resource.containsKey("collection")) {
// String database = resource.getString("db");
// String collection = resource.getString("collection");
//
// boolean resourceMatches = false;
// boolean collectionMatches = collection.isEmpty() || collection.equals(collectionName);
// if (database.isEmpty() && collectionMatches) {
// resourceMatches = true;
// }
// else if (database.equals(authDatabase) && collection.isEmpty()) {
// resourceMatches = true;
// }
// else if (database.equals(databaseName) && collectionMatches) {
// resourceMatches = true;
// }
//
// if (resourceMatches) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// }
//
// if (unsupportedUserActions.isEmpty()) {
// break;
// }
// }
//
// return unsupportedUserActions;
// }
// private static List<String> removeUserActions(final Document rolesInfo, final String authDatabase, final String databaseName, final String collectionName,
// final List<String> userActions) {
// List<Document> privileges = rolesInfo.getList("inheritedPrivileges", Document.class, emptyList());
// if (privileges.isEmpty() || userActions.isEmpty()) {
// return userActions;
// }
//
// List<String> unsupportedUserActions = new ArrayList<>(userActions);
// for (final Document privilege : privileges) {
// Document resource = privilege.get("resource", new Document());
// if (resource.containsKey("cluster") && resource.getBoolean("cluster")) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// else if (resource.containsKey("db") && resource.containsKey("collection")) {
// String database = resource.getString("db");
// String collection = resource.getString("collection");
//
// boolean resourceMatches = false;
// boolean collectionMatches = collection.isEmpty() || collection.equals(collectionName);
// if (database.isEmpty() && collectionMatches) {
// resourceMatches = true;
// }
// else if (database.equals(authDatabase) && collection.isEmpty()) {
// resourceMatches = true;
// }
// else if (database.equals(databaseName) && collectionMatches) {
// resourceMatches = true;
// }
//
// if (resourceMatches) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// }
//
// if (unsupportedUserActions.isEmpty()) {
// break;
// }
// }
//
// return unsupportedUserActions;
// }
public static Optional<ConfigValue> getConfigByName(final Config config, final String name) {
for (final ConfigValue configValue : config.configValues()) {

View File

@ -12,7 +12,6 @@
import java.util.OptionalLong;
import java.util.stream.Collectors;
import io.debezium.DebeziumException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
@ -25,6 +24,7 @@
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import io.debezium.DebeziumException;
import io.debezium.dlq.ErrorReporter;
final class StartedMongoDbSinkTask implements AutoCloseable {

View File

@ -11,6 +11,9 @@
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
/**
* Strategy for different write models to MongoDB (replace, update, insert/append only, etc).
*/
public interface WriteModelStrategy {
WriteModel<BsonDocument> createWriteModel(SinkDocument document);

View File

@ -10,42 +10,22 @@
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDecimal128;
import org.bson.BsonDouble;
import org.bson.BsonValue;
import org.bson.types.Decimal128;
public class DecimalType extends AbstractBsonType {
public enum Format {
DECIMAL128, // for MongoDB v3.4+
LEGACYDOUBLE // results in double approximation
}
private final Format format;
public DecimalType() {
super(Decimal.schema(0));
this.format = Format.DECIMAL128;
}
public DecimalType(final Format format) {
super(Decimal.schema(0));
this.format = format;
}
@Override
public BsonValue toBson(final Object data) {
if (data instanceof BigDecimal) {
if (format.equals(Format.DECIMAL128)) {
return new BsonDecimal128(new Decimal128((BigDecimal) data));
}
if (format.equals(Format.LEGACYDOUBLE)) {
return new BsonDouble(((BigDecimal) data).doubleValue());
}
return new BsonDecimal128(new Decimal128((BigDecimal) data));
}
throw new DataException(
"Decimal conversion not possible when data is of type " + data.getClass().getName()
+ " and format is " + format);
"Decimal conversion not possible when data is of type " + data.getClass().getName());
}
}