diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoProcessedSinkRecordData.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoProcessedSinkRecordData.java index 2ea56e323..e5bcc7d82 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoProcessedSinkRecordData.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoProcessedSinkRecordData.java @@ -16,8 +16,8 @@ import com.mongodb.MongoNamespace; import com.mongodb.client.model.WriteModel; -import io.debezium.connector.mongodb.sink.converters.SinkRecordConverter; import io.debezium.connector.mongodb.sink.converters.SinkDocument; +import io.debezium.connector.mongodb.sink.converters.SinkRecordConverter; import io.debezium.connector.mongodb.sink.eventhandler.relational.RelationalEventHandler; public class MongoProcessedSinkRecordData { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/SinkConnection.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/SinkConnection.java index db6c9a2d3..a0e0bbcd7 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/SinkConnection.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/SinkConnection.java @@ -28,8 +28,8 @@ public final class SinkConnection { -// private static final String USERS_INFO = "{usersInfo: '%s', showPrivileges: true}"; -// private static final String ROLES_INFO = "{rolesInfo: '%s', showPrivileges: true, showBuiltinRoles: true}"; + // private static final String USERS_INFO = "{usersInfo: '%s', showPrivileges: true}"; + // private static final String ROLES_INFO = "{rolesInfo: '%s', showPrivileges: true, showBuiltinRoles: true}"; public static Optional canConnect(final Config config, final Field connectionStringConfigName) { Optional optionalConnectionString = getConfigByName(config, connectionStringConfigName.name()); @@ -88,116 +88,116 @@ public void clusterDescriptionChanged( return Optional.empty(); } -// public static void checkUserHasActions( -// final MongoClient client, -// final MongoCredential credential, -// final List actions, -// final String dbName, -// final String collectionName, -// final String configName, -// final Config config) { -// -// if (credential == null) { -// return; -// } -// -// try { -// Document usersInfo = client -// .getDatabase(credential.getSource()) -// .runCommand(Document.parse(format(USERS_INFO, credential.getUserName()))); -// -// List unsupportedActions = new ArrayList<>(actions); -// for (final Document userInfo : usersInfo.getList("users", Document.class)) { -// unsupportedActions = removeUserActions( -// userInfo, credential.getSource(), dbName, collectionName, actions); -// -// if (!unsupportedActions.isEmpty() -// && userInfo.getList("inheritedPrivileges", Document.class, emptyList()).isEmpty()) { -// for (final Document inheritedRole : userInfo.getList("inheritedRoles", Document.class, emptyList())) { -// Document rolesInfo = client -// .getDatabase(inheritedRole.getString("db")) -// .runCommand( -// Document.parse(format(ROLES_INFO, inheritedRole.getString("role")))); -// for (final Document roleInfo : rolesInfo.getList("roles", Document.class, emptyList())) { -// unsupportedActions = removeUserActions( -// roleInfo, -// credential.getSource(), -// dbName, -// collectionName, -// unsupportedActions); -// } -// -// if (unsupportedActions.isEmpty()) { -// return; -// } -// } -// } -// if (unsupportedActions.isEmpty()) { -// return; -// } -// } -// -// String missingPermissions = String.join(", ", unsupportedActions); -// getConfigByName(config, configName) -// .ifPresent( -// c -> c.addErrorMessage( -// format( -// "Invalid user permissions. Missing the following action permissions: %s", -// missingPermissions))); -// } -// catch (MongoSecurityException e) { -// getConfigByName(config, configName) -// .ifPresent(c -> c.addErrorMessage("Invalid user permissions authentication failed.")); -// } -// catch (Exception e) { -// LOGGER.warn("Permission validation failed due to: {}", e.getMessage(), e); -// } -// } + // public static void checkUserHasActions( + // final MongoClient client, + // final MongoCredential credential, + // final List actions, + // final String dbName, + // final String collectionName, + // final String configName, + // final Config config) { + // + // if (credential == null) { + // return; + // } + // + // try { + // Document usersInfo = client + // .getDatabase(credential.getSource()) + // .runCommand(Document.parse(format(USERS_INFO, credential.getUserName()))); + // + // List unsupportedActions = new ArrayList<>(actions); + // for (final Document userInfo : usersInfo.getList("users", Document.class)) { + // unsupportedActions = removeUserActions( + // userInfo, credential.getSource(), dbName, collectionName, actions); + // + // if (!unsupportedActions.isEmpty() + // && userInfo.getList("inheritedPrivileges", Document.class, emptyList()).isEmpty()) { + // for (final Document inheritedRole : userInfo.getList("inheritedRoles", Document.class, emptyList())) { + // Document rolesInfo = client + // .getDatabase(inheritedRole.getString("db")) + // .runCommand( + // Document.parse(format(ROLES_INFO, inheritedRole.getString("role")))); + // for (final Document roleInfo : rolesInfo.getList("roles", Document.class, emptyList())) { + // unsupportedActions = removeUserActions( + // roleInfo, + // credential.getSource(), + // dbName, + // collectionName, + // unsupportedActions); + // } + // + // if (unsupportedActions.isEmpty()) { + // return; + // } + // } + // } + // if (unsupportedActions.isEmpty()) { + // return; + // } + // } + // + // String missingPermissions = String.join(", ", unsupportedActions); + // getConfigByName(config, configName) + // .ifPresent( + // c -> c.addErrorMessage( + // format( + // "Invalid user permissions. Missing the following action permissions: %s", + // missingPermissions))); + // } + // catch (MongoSecurityException e) { + // getConfigByName(config, configName) + // .ifPresent(c -> c.addErrorMessage("Invalid user permissions authentication failed.")); + // } + // catch (Exception e) { + // LOGGER.warn("Permission validation failed due to: {}", e.getMessage(), e); + // } + // } /** * Checks the roles info document for matching actions and removes them from the provided list */ -// private static List removeUserActions(final Document rolesInfo, final String authDatabase, final String databaseName, final String collectionName, -// final List userActions) { -// List privileges = rolesInfo.getList("inheritedPrivileges", Document.class, emptyList()); -// if (privileges.isEmpty() || userActions.isEmpty()) { -// return userActions; -// } -// -// List unsupportedUserActions = new ArrayList<>(userActions); -// for (final Document privilege : privileges) { -// Document resource = privilege.get("resource", new Document()); -// if (resource.containsKey("cluster") && resource.getBoolean("cluster")) { -// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList())); -// } -// else if (resource.containsKey("db") && resource.containsKey("collection")) { -// String database = resource.getString("db"); -// String collection = resource.getString("collection"); -// -// boolean resourceMatches = false; -// boolean collectionMatches = collection.isEmpty() || collection.equals(collectionName); -// if (database.isEmpty() && collectionMatches) { -// resourceMatches = true; -// } -// else if (database.equals(authDatabase) && collection.isEmpty()) { -// resourceMatches = true; -// } -// else if (database.equals(databaseName) && collectionMatches) { -// resourceMatches = true; -// } -// -// if (resourceMatches) { -// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList())); -// } -// } -// -// if (unsupportedUserActions.isEmpty()) { -// break; -// } -// } -// -// return unsupportedUserActions; -// } + // private static List removeUserActions(final Document rolesInfo, final String authDatabase, final String databaseName, final String collectionName, + // final List userActions) { + // List privileges = rolesInfo.getList("inheritedPrivileges", Document.class, emptyList()); + // if (privileges.isEmpty() || userActions.isEmpty()) { + // return userActions; + // } + // + // List unsupportedUserActions = new ArrayList<>(userActions); + // for (final Document privilege : privileges) { + // Document resource = privilege.get("resource", new Document()); + // if (resource.containsKey("cluster") && resource.getBoolean("cluster")) { + // unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList())); + // } + // else if (resource.containsKey("db") && resource.containsKey("collection")) { + // String database = resource.getString("db"); + // String collection = resource.getString("collection"); + // + // boolean resourceMatches = false; + // boolean collectionMatches = collection.isEmpty() || collection.equals(collectionName); + // if (database.isEmpty() && collectionMatches) { + // resourceMatches = true; + // } + // else if (database.equals(authDatabase) && collection.isEmpty()) { + // resourceMatches = true; + // } + // else if (database.equals(databaseName) && collectionMatches) { + // resourceMatches = true; + // } + // + // if (resourceMatches) { + // unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList())); + // } + // } + // + // if (unsupportedUserActions.isEmpty()) { + // break; + // } + // } + // + // return unsupportedUserActions; + // } public static Optional getConfigByName(final Config config, final String name) { for (final ConfigValue configValue : config.configValues()) { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java index 0caa519fb..b035d258e 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java @@ -12,7 +12,6 @@ import java.util.OptionalLong; import java.util.stream.Collectors; -import io.debezium.DebeziumException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; @@ -25,6 +24,7 @@ import com.mongodb.client.model.BulkWriteOptions; import com.mongodb.client.model.WriteModel; +import io.debezium.DebeziumException; import io.debezium.dlq.ErrorReporter; final class StartedMongoDbSinkTask implements AutoCloseable { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/WriteModelStrategy.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/WriteModelStrategy.java index c8b4b10ce..56c52113e 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/WriteModelStrategy.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/WriteModelStrategy.java @@ -11,6 +11,9 @@ import io.debezium.connector.mongodb.sink.converters.SinkDocument; +/** + * Strategy for different write models to MongoDB (replace, update, insert/append only, etc). + */ public interface WriteModelStrategy { WriteModel createWriteModel(SinkDocument document); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/converters/bson/DecimalType.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/converters/bson/DecimalType.java index 52707038a..3746d6dfb 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/converters/bson/DecimalType.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/converters/bson/DecimalType.java @@ -10,42 +10,22 @@ import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.errors.DataException; import org.bson.BsonDecimal128; -import org.bson.BsonDouble; import org.bson.BsonValue; import org.bson.types.Decimal128; public class DecimalType extends AbstractBsonType { - public enum Format { - DECIMAL128, // for MongoDB v3.4+ - LEGACYDOUBLE // results in double approximation - } - - private final Format format; - public DecimalType() { super(Decimal.schema(0)); - this.format = Format.DECIMAL128; - } - - public DecimalType(final Format format) { - super(Decimal.schema(0)); - this.format = format; } @Override public BsonValue toBson(final Object data) { if (data instanceof BigDecimal) { - if (format.equals(Format.DECIMAL128)) { - return new BsonDecimal128(new Decimal128((BigDecimal) data)); - } - if (format.equals(Format.LEGACYDOUBLE)) { - return new BsonDouble(((BigDecimal) data).doubleValue()); - } + return new BsonDecimal128(new Decimal128((BigDecimal) data)); } throw new DataException( - "Decimal conversion not possible when data is of type " + data.getClass().getName() - + " and format is " + format); + "Decimal conversion not possible when data is of type " + data.getClass().getName()); } }