DBZ-7223 Add the MongoDB sink connector

closes https://issues.redhat.com/browse/DBZ-7223
This commit is contained in:
rkerner 2024-06-19 12:41:42 +02:00 committed by Jiri Pechanec
parent 4806fb4567
commit 3799e3d76b
50 changed files with 2770 additions and 34 deletions

View File

@ -36,6 +36,7 @@
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.mongodb.connection.DefaultMongoDbAuthProvider;
import io.debezium.connector.mongodb.connection.MongoDbAuthProvider;
import io.debezium.connector.mongodb.shared.SharedMongoDbConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.spi.schema.DataCollectionId;
@ -44,7 +45,7 @@
/**
* The configuration properties.
*/
public class MongoDbConnectorConfig extends CommonConnectorConfig {
public class MongoDbConnectorConfig extends CommonConnectorConfig implements SharedMongoDbConnectorConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnectorConfig.class);
@ -619,16 +620,6 @@ public static OversizeHandlingMode parse(String value, String defaultValue) {
.withDefault(false)
.withType(Type.BOOLEAN);
// MongoDb fields in Connection Group start from 1 (topic.prefix is 0)
public static final Field CONNECTION_STRING = Field.create("mongodb.connection.string")
.withDisplayName("Connection String")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(MongoDbConnectorConfig::validateConnectionString)
.withDescription("Database connection string.");
public static final Field USER = Field.create("mongodb.user")
.withDisplayName("User")
.withType(Type.STRING)
@ -1169,24 +1160,6 @@ private static int validateOversizeSkipThreshold(Configuration config, Field fie
return 0;
}
private static int validateConnectionString(Configuration config, Field field, ValidationOutput problems) {
String connectionStringValue = config.getString(field);
if (connectionStringValue == null) {
problems.accept(field, null, "Missing connection string");
return 1;
}
try {
ConnectionString cs = new ConnectionString(connectionStringValue);
}
catch (Exception e) {
problems.accept(field, connectionStringValue, "Invalid connection string");
return 1;
}
return 0;
}
private static int validateFieldExcludeList(Configuration config, Field field, ValidationOutput problems) {
int problemCount = 0;
String fieldExcludeList = config.getString(FIELD_EXCLUDE_LIST);
@ -1454,11 +1427,6 @@ private static int resolveSnapshotMaxThreads(Configuration config) {
return config.getInteger(SNAPSHOT_MAX_THREADS);
}
private static ConnectionString resolveConnectionString(Configuration config) {
var connectionString = config.getString(MongoDbConnectorConfig.CONNECTION_STRING);
return new ConnectionString(connectionString);
}
@Override
public Optional<String[]> parseSignallingMessage(Struct value) {
final String after = value.getString(Envelope.FieldName.AFTER);

View File

@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.shared;
import org.apache.kafka.common.config.ConfigDef;
import com.mongodb.ConnectionString;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
public interface SharedMongoDbConnectorConfig {
// MongoDb fields in Connection Group start from 1 (topic.prefix is 0)
Field CONNECTION_STRING = Field.create("mongodb.connection.string")
.withDisplayName("Connection String")
.withType(ConfigDef.Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1))
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(SharedMongoDbConnectorConfig::validateConnectionString)
.withDescription("Database connection string.");
private static int validateConnectionString(Configuration config, Field field, Field.ValidationOutput problems) {
String connectionStringValue = config.getString(field);
if (connectionStringValue == null) {
problems.accept(field, null, "Missing connection string");
return 1;
}
try {
new ConnectionString(connectionStringValue);
}
catch (Exception e) {
problems.accept(field, connectionStringValue, "Invalid connection string");
return 1;
}
return 0;
}
default ConnectionString resolveConnectionString(Configuration config) {
var connectionString = config.getString(CONNECTION_STRING);
return new ConnectionString(connectionString);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import org.bson.BsonDocument;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
public class DefaultWriteModelStrategy implements WriteModelStrategy {
private final WriteModelStrategy writeModelStrategy = new ReplaceDefaultStrategy();
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
return writeModelStrategy.createWriteModel(document);
}
WriteModelStrategy getWriteModelStrategy() {
return writeModelStrategy;
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import static io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig.ID_FIELD;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
public class DeleteDefaultStrategy implements WriteModelStrategy {
private final IdStrategy idStrategy;
public DeleteDefaultStrategy() {
this(new DefaultIdFieldStrategy());
}
public DeleteDefaultStrategy(final IdStrategy idStrategy) {
this.idStrategy = idStrategy;
}
public IdStrategy getIdStrategy() {
return this.idStrategy;
}
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
document.getKeyDoc()
.orElseThrow(() -> new DataException("Could not build the WriteModel,the key document was missing unexpectedly"));
BsonDocument deleteFilter;
if (idStrategy instanceof DefaultIdFieldStrategy) {
deleteFilter = idStrategy.generateId(document, null).asDocument();
}
else {
deleteFilter = new BsonDocument(ID_FIELD, idStrategy.generateId(document, null));
}
return new DeleteOneModel<>(deleteFilter);
}
static class DefaultIdFieldStrategy implements IdStrategy {
@Override
public BsonValue generateId(final SinkDocument doc, final SinkRecord orig) {
BsonDocument kd = doc.getKeyDoc().get();
return kd.containsKey(ID_FIELD) ? kd : new BsonDocument(ID_FIELD, kd);
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonValue;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
/**
* Strategy that returns the identity of the MongoDB document. This is typically the _id field of the document.
*/
public interface IdStrategy {
BsonValue generateId(SinkDocument doc, SinkRecord orig);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import java.util.Properties;
import io.debezium.util.IoUtil;
/**
* Information about this module.
*
* @author Hossein Torabi
*/
public class Module {
private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/connector/mongodb/build.version");
public static String version() {
return INFO.getProperty("version");
}
/**
* @return symbolic name of the connector plugin
*/
public static String name() {
return "mongodb-sink";
}
/**
* @return context name used in log MDC and JMX metrics
*/
public static String contextName() {
return "MongoDbSink";
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
public class MongoDbSinkConnector extends SinkConnector {
@Immutable
private Map<String, String> properties;
@Override
public String version() {
return Module.version();
}
@Override
public void start(Map<String, String> props) {
this.properties = Map.copyOf(props);
}
@Override
public Class<? extends Task> taskClass() {
return MongoDbSinkConnectorTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; ++i) {
configs.add(properties);
}
return configs;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return MongoDbSinkConnectorConfig.configDef();
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
MongoDbSinkConnectorConfig sinkConfig;
try {
sinkConfig = new MongoDbSinkConnectorConfig(Configuration.from(connectorConfigs));
}
catch (Exception e) {
return config;
}
SinkConnection.canConnect(config, MongoDbSinkConnectorConfig.CONNECTION_STRING);
return config;
}
}

View File

@ -0,0 +1,232 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.ConnectionString;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.SinkConnectorConfig;
import io.debezium.connector.mongodb.shared.SharedMongoDbConnectorConfig;
import io.debezium.table.ColumnNamingStrategy;
import io.debezium.table.DefaultColumnNamingStrategy;
import io.debezium.table.DefaultTableNamingStrategy;
import io.debezium.table.FieldFilterFactory;
import io.debezium.table.TableNamingStrategy;
import io.debezium.util.Strings;
public class MongoDbSinkConnectorConfig implements SharedMongoDbConnectorConfig, SinkConnectorConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorConfig.class);
public static final String ID_FIELD = "_id";
public static final String SINK_DATABASE = "sink.database";
public static final String INSERT_MODE = "insert.mode";
public static final String DELETE_ENABLED = "delete.enabled";
public static final String TRUNCATE_ENABLED = "truncate.enabled";
public static final String TABLE_NAME_FORMAT = "table.name.format";
public static final String TABLE_NAMING_STRATEGY = "table.naming.strategy";
public static final String COLUMN_NAMING_STRATEGY = "column.naming.strategy";
public static final String BATCH_SIZE = "batch.size";
public static final String FIELD_INCLUDE_LIST = "field.include.list";
public static final String FIELD_EXCLUDE_LIST = "field.exclude.list";
public static final Field SINK_DATABASE_NAME = Field.create(SINK_DATABASE)
.withDisplayName("The sink MongoDB database name.")
.withType(ConfigDef.Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2))
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("The name of the MongoDB database to which the connector writes to.")
.required();
public static final Field TABLE_NAME_FORMAT_FIELD = Field.create(TABLE_NAME_FORMAT)
.withDisplayName("A format string for the table")
.withType(ConfigDef.Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 3))
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault("${topic}")
.withDescription("A format string for the table, which may contain '${topic}' as a placeholder for the original topic name.");
public static final Field TABLE_NAMING_STRATEGY_FIELD = Field.create(TABLE_NAMING_STRATEGY)
.withDisplayName("Name of the strategy class that implements the TablingNamingStrategy interface")
.withType(ConfigDef.Type.CLASS)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 2))
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(DefaultTableNamingStrategy.class.getName())
.withDescription("Name of the strategy class that implements the TableNamingStrategy interface.");
public static final Field COLUMN_NAMING_STRATEGY_FIELD = Field.create(COLUMN_NAMING_STRATEGY)
.withDisplayName("Name of the strategy class that implements the ColumnNamingStrategy interface")
.withType(ConfigDef.Type.CLASS)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 3))
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(DefaultColumnNamingStrategy.class.getName())
.withDescription("Name of the strategy class that implements the ColumnNamingStrategy interface.");
public static final Field BATCH_SIZE_FIELD = Field.create(BATCH_SIZE)
.withDisplayName("Specifies how many records to attempt to batch together into the destination table, when possible. " +
"You can also configure the connectors underlying consumers max.poll.records using consumer.override.max.poll.records in the connector configuration.")
.withType(ConfigDef.Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 4))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(500)
.withDescription("Specifies how many records to attempt to batch together into the destination table, when possible. " +
"You can also configure the connectors underlying consumers max.poll.records using consumer.override.max.poll.records in the connector configuration.");
// public static final Field FIELD_INCLUDE_LIST_FIELD = Field.create(FIELD_INCLUDE_LIST)
// .withDisplayName("Include Fields")
// .withType(ConfigDef.Type.LIST)
// .withGroup(Field.createGroupEntry(Field.Group.FILTERS, 1))
// .withWidth(ConfigDef.Width.LONG)
// .withImportance(ConfigDef.Importance.MEDIUM)
// .withValidation(Field::isListOfRegex)
// .withDescription("A comma-separated list of regular expressions matching fully-qualified names of fields that "
// + "should be included in change events. The field names must be delimited by the format <topic>:<field> ");
// public static final Field FIELD_EXCLUDE_LIST_FIELD = Field.create(FIELD_EXCLUDE_LIST)
// .withDisplayName("Exclude Fields")
// .withType(ConfigDef.Type.LIST)
// .withGroup(Field.createGroupEntry(Field.Group.FILTERS, 2))
// .withWidth(ConfigDef.Width.LONG)
// .withImportance(ConfigDef.Importance.MEDIUM)
// .withValidation(Field::isListOfRegex)
// .withDescription("A comma-separated list of regular expressions matching fully-qualified names of fields that "
// + "should be excluded from change events. The field names must be delimited by the format <topic>:<field> ");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
SINK_DATABASE_NAME,
CONNECTION_STRING,
TABLE_NAMING_STRATEGY_FIELD,
COLUMN_NAMING_STRATEGY_FIELD,
BATCH_SIZE_FIELD
// FIELD_INCLUDE_LIST_FIELD,
// FIELD_EXCLUDE_LIST_FIELD
)
.create();
/**
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(CONFIG_DEFINITION.all());
public WriteModelStrategy getWriteModelStrategy() {
return new DefaultWriteModelStrategy();
}
public WriteModelStrategy getDeleteWriteModelStrategy() {
return new DeleteDefaultStrategy();
}
private final Configuration config;
private final ConnectionString connectionString;
private final String sinkDatabaseName;
private final String tableNameFormat;
// private final Set<String> dataTypeMapping;
private final TableNamingStrategy tableNamingStrategy;
private final ColumnNamingStrategy columnNamingStrategy;
private FieldFilterFactory.FieldNameFilter fieldsFilter;
private final int batchSize;
public MongoDbSinkConnectorConfig(Configuration config) {
this.config = config;
this.connectionString = resolveConnectionString(config);
this.sinkDatabaseName = config.getString(SINK_DATABASE_NAME);
this.tableNameFormat = config.getString(TABLE_NAME_FORMAT_FIELD);
this.tableNamingStrategy = config.getInstance(TABLE_NAMING_STRATEGY_FIELD, TableNamingStrategy.class);
this.columnNamingStrategy = config.getInstance(COLUMN_NAMING_STRATEGY_FIELD, ColumnNamingStrategy.class);
this.batchSize = config.getInteger(BATCH_SIZE_FIELD);
String fieldExcludeList = config.getString(FIELD_EXCLUDE_LIST);
String fieldIncludeList = config.getString(FIELD_INCLUDE_LIST);
this.fieldsFilter = FieldFilterFactory.createFieldFilter(fieldIncludeList, fieldExcludeList);
}
public void validate() {
if (!config.validateAndRecord(MongoDbSinkConnectorConfig.ALL_FIELDS, LOGGER::error)) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Starting {} with configuration:", getClass().getSimpleName());
config.withMaskedPasswords().forEach((propName, propValue) -> {
LOGGER.info(" {} = {}", propName, propValue);
});
}
String columnExcludeList = config.getString(FIELD_EXCLUDE_LIST);
String columnIncludeList = config.getString(FIELD_INCLUDE_LIST);
if (!Strings.isNullOrEmpty(columnExcludeList) && !Strings.isNullOrEmpty(columnIncludeList)) {
throw new ConnectException("Cannot define both column.exclude.list and column.include.list. Please specify only one.");
}
}
public boolean validateAndRecord(Iterable<Field> fields, Consumer<String> problems) {
return config.validateAndRecord(fields, problems);
}
protected static ConfigDef configDef() {
return CONFIG_DEFINITION.configDef();
}
public int getBatchSize() {
return batchSize;
}
// public Set<String> getDataTypeMapping() {
// return dataTypeMapping;
// }
public TableNamingStrategy getTableNamingStrategy() {
return tableNamingStrategy;
}
public ColumnNamingStrategy getColumnNamingStrategy() {
return columnNamingStrategy;
}
// public FieldFilterFactory.FieldNameFilter getFieldsFilter() {
// return fieldsFilter;
// }
public String getContextName() {
return Module.contextName();
}
public String getConnectorName() {
return Module.name();
}
public ConnectionString getConnectionString() {
return this.connectionString;
}
public String getSinkDatabaseName() {
return sinkDatabaseName;
}
public String getTableNameFormat() {
return tableNameFormat;
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.client.MongoClient;
import com.mongodb.internal.VisibleForTesting;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.connection.MongoDbConnectionContext;
import io.debezium.dlq.ErrorReporter;
public class MongoDbSinkConnectorTask extends SinkTask {
static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorTask.class);
private static final String CONNECTOR_TYPE = "sink";
private StartedMongoDbSinkTask startedTask;
private MongoDbConnectionContext connectionContext;
@Override
public String version() {
return Module.version();
}
/**
* Start the Task. This should handle any configuration parsing and one-time setup of the task.
*
* @param props initial configuration
*/
@SuppressWarnings("try")
@Override
public void start(final Map<String, String> props) {
LOGGER.info("Starting MongoDB sink task");
var config = Configuration.from(props);
final MongoDbSinkConnectorConfig sinkConfig = new MongoDbSinkConnectorConfig(config);
MongoClient client = null;
try {
this.connectionContext = new MongoDbConnectionContext(config);
client = this.connectionContext.getMongoClient();
startedTask = new StartedMongoDbSinkTask(sinkConfig, client, createErrorReporter());
}
catch (RuntimeException taskStartingException) {
// noinspection EmptyTryBlock
try (MongoClient autoCloseableClient = client) {
// just using try-with-resources to ensure they all get closed, even in the case of
// exceptions
}
catch (RuntimeException resourceReleasingException) {
taskStartingException.addSuppressed(resourceReleasingException);
}
throw new ConnectException("Failed to start MongoDB sink task", taskStartingException);
}
LOGGER.debug("Started MongoDB sink task");
}
/**
* Put the records in the sink. Usually this should send the records to the sink asynchronously
* and immediately return.
*
* <p>If this operation fails, the SinkTask may throw a {@link
* org.apache.kafka.connect.errors.RetriableException} to indicate that the framework should
* attempt to retry the same call again. Other exceptions will cause the task to be stopped
* immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before
* the batch will be retried.
*
* @param records the set of records to send
*/
@Override
public void put(final Collection<SinkRecord> records) {
startedTask.put(records);
}
/**
* Flush all records that have been {@link #put(Collection)} for the specified topic-partitions.
*
* @param currentOffsets the current offset state as of the last call to {@link
* #put(Collection)}}, provided for convenience but could also be determined by tracking all
* offsets included in the {@link SinkRecord}s passed to {@link #put}.
*/
@Override
public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// NOTE: flush is not used for now...
LOGGER.debug("Flush called - noop");
}
/**
* Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once
* outstanding calls to other methods have completed (e.g., {@link #put(Collection)} has returned)
* and a final {@link #flush(Map)} and offset commit has completed. Implementations of this method
* should only need to perform final cleanup operations, such as closing network connections to
* the sink system.
*/
@Override
public void stop() {
LOGGER.info("Stopping MongoDB sink task");
if (startedTask != null) {
startedTask.close();
}
}
private ErrorReporter createErrorReporter() {
ErrorReporter result = nopErrorReporter();
if (context != null) {
try {
ErrantRecordReporter errantRecordReporter = context.errantRecordReporter();
if (errantRecordReporter != null) {
result = errantRecordReporter::report;
}
else {
LOGGER.info("Errant record reporter not configured.");
}
}
catch (NoClassDefFoundError | NoSuchMethodError e) {
// Will occur in Connect runtimes earlier than 2.6
LOGGER.info("Kafka versions prior to 2.6 do not support the errant record reporter.");
}
}
return result;
}
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
static ErrorReporter nopErrorReporter() {
return (record, e) -> {
};
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkRecordConverter;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.connector.mongodb.sink.eventhandler.relational.RelationalEventHandler;
public class MongoProcessedSinkRecordData {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoProcessedSinkRecordData.class);
private final MongoDbSinkConnectorConfig config;
private final MongoNamespace namespace;
private final SinkRecord sinkRecord;
private final SinkDocument sinkDocument;
private final WriteModel<BsonDocument> writeModel;
private Exception exception;
private final String databaseName;
MongoProcessedSinkRecordData(final SinkRecord sinkRecord, final MongoDbSinkConnectorConfig sinkConfig) {
this.sinkRecord = sinkRecord;
this.databaseName = sinkConfig.getSinkDatabaseName();
this.config = sinkConfig;
this.sinkDocument = new SinkRecordConverter().convert(sinkRecord);
this.namespace = createNamespace();
this.writeModel = createWriteModel();
}
public MongoDbSinkConnectorConfig getConfig() {
return config;
}
public MongoNamespace getNamespace() {
return namespace;
}
public SinkRecord getSinkRecord() {
return sinkRecord;
}
public WriteModel<BsonDocument> getWriteModel() {
return writeModel;
}
public Exception getException() {
return exception;
}
private MongoNamespace createNamespace() {
return tryProcess(
() -> Optional.of(new MongoNamespace(databaseName, config.getTableNamingStrategy().resolveTableName(config, sinkRecord))))
.orElse(null);
}
private WriteModel<BsonDocument> createWriteModel() {
return tryProcess(
() -> new RelationalEventHandler(config).handle(sinkDocument))
.orElse(null);
}
private <T> Optional<T> tryProcess(final Supplier<Optional<T>> supplier) {
try {
return supplier.get();
}
catch (Exception e) {
exception = e;
// if (config.logErrors()) {
LOGGER.error("Unable to process record {}", sinkRecord, e);
// }
// if (!config.tolerateErrors()) {
// throw e;
// }
}
return Optional.empty();
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.dlq.ErrorReporter;
final class MongoSinkRecordProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkRecordProcessor.class);
static List<List<MongoProcessedSinkRecordData>> orderedGroupByTopicAndNamespace(
final Collection<SinkRecord> records,
final MongoDbSinkConnectorConfig sinkConfig,
final ErrorReporter errorReporter) {
LOGGER.debug("Number of sink records to process: {}", records.size());
List<List<MongoProcessedSinkRecordData>> orderedProcessedSinkRecordData = new ArrayList<>();
List<MongoProcessedSinkRecordData> currentGroup = new ArrayList<>();
MongoProcessedSinkRecordData previous = null;
for (SinkRecord record : records) {
MongoProcessedSinkRecordData processedData = new MongoProcessedSinkRecordData(record, sinkConfig);
if (processedData.getException() != null) {
errorReporter.report(processedData.getSinkRecord(), processedData.getException());
continue;
}
else if (processedData.getNamespace() == null || processedData.getWriteModel() == null) {
// Some CDC events can be Noops (eg tombstone events)
continue;
}
if (previous == null) {
previous = processedData;
}
int maxBatchSize = processedData.getConfig().getBatchSize();
if (maxBatchSize > 0 && currentGroup.size() == maxBatchSize
|| !previous.getSinkRecord().topic().equals(processedData.getSinkRecord().topic())
|| !previous.getNamespace().equals(processedData.getNamespace())) {
orderedProcessedSinkRecordData.add(currentGroup);
currentGroup = new ArrayList<>();
}
previous = processedData;
currentGroup.add(processedData);
}
if (!currentGroup.isEmpty()) {
orderedProcessedSinkRecordData.add(currentGroup);
}
return orderedProcessedSinkRecordData;
}
private MongoSinkRecordProcessor() {
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import static io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig.ID_FIELD;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
public class ReplaceDefaultStrategy implements WriteModelStrategy {
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument vd = document
.getValueDoc()
.orElseThrow(
() -> new DataException(
"Could not build the WriteModel,the value document was missing unexpectedly"));
BsonValue idValue = vd.get(ID_FIELD);
if (idValue == null) {
throw new DataException(
"Could not build the WriteModel,the `_id` field was missing unexpectedly");
}
return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, idValue), vd, REPLACE_OPTIONS);
}
}

View File

@ -0,0 +1,213 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.event.ClusterClosedEvent;
import com.mongodb.event.ClusterDescriptionChangedEvent;
import com.mongodb.event.ClusterListener;
import com.mongodb.event.ClusterOpeningEvent;
import io.debezium.config.Field;
public final class SinkConnection {
// private static final String USERS_INFO = "{usersInfo: '%s', showPrivileges: true}";
// private static final String ROLES_INFO = "{rolesInfo: '%s', showPrivileges: true, showBuiltinRoles: true}";
public static Optional<MongoClient> canConnect(final Config config, final Field connectionStringConfigName) {
Optional<ConfigValue> optionalConnectionString = getConfigByName(config, connectionStringConfigName.name());
if (optionalConnectionString.isPresent() && optionalConnectionString.get().errorMessages().isEmpty()) {
ConfigValue configValue = optionalConnectionString.get();
AtomicBoolean connected = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
ConnectionString connectionString = new ConnectionString((String) configValue.value());
MongoClientSettings mongoClientSettings = MongoClientSettings.builder()
.applyConnectionString(connectionString)
.applyToClusterSettings(
b -> b.addClusterListener(
new ClusterListener() {
@Override
public void clusterOpening(final ClusterOpeningEvent event) {
}
@Override
public void clusterClosed(final ClusterClosedEvent event) {
}
@Override
public void clusterDescriptionChanged(
final ClusterDescriptionChangedEvent event) {
ReadPreference readPreference = connectionString.getReadPreference() != null
? connectionString.getReadPreference()
: ReadPreference.primaryPreferred();
if (!connected.get()
&& event.getNewDescription().hasReadableServer(readPreference)) {
connected.set(true);
latch.countDown();
}
}
}))
.build();
long latchTimeout = mongoClientSettings.getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) + 500;
MongoClient mongoClient = MongoClients.create(mongoClientSettings);
try {
if (!latch.await(latchTimeout, TimeUnit.MILLISECONDS)) {
configValue.addErrorMessage("Unable to connect to the server.");
mongoClient.close();
}
}
catch (InterruptedException e) {
mongoClient.close();
throw new ConnectException(e);
}
if (configValue.errorMessages().isEmpty()) {
return Optional.of(mongoClient);
}
}
return Optional.empty();
}
// public static void checkUserHasActions(
// final MongoClient client,
// final MongoCredential credential,
// final List<String> actions,
// final String dbName,
// final String collectionName,
// final String configName,
// final Config config) {
//
// if (credential == null) {
// return;
// }
//
// try {
// Document usersInfo = client
// .getDatabase(credential.getSource())
// .runCommand(Document.parse(format(USERS_INFO, credential.getUserName())));
//
// List<String> unsupportedActions = new ArrayList<>(actions);
// for (final Document userInfo : usersInfo.getList("users", Document.class)) {
// unsupportedActions = removeUserActions(
// userInfo, credential.getSource(), dbName, collectionName, actions);
//
// if (!unsupportedActions.isEmpty()
// && userInfo.getList("inheritedPrivileges", Document.class, emptyList()).isEmpty()) {
// for (final Document inheritedRole : userInfo.getList("inheritedRoles", Document.class, emptyList())) {
// Document rolesInfo = client
// .getDatabase(inheritedRole.getString("db"))
// .runCommand(
// Document.parse(format(ROLES_INFO, inheritedRole.getString("role"))));
// for (final Document roleInfo : rolesInfo.getList("roles", Document.class, emptyList())) {
// unsupportedActions = removeUserActions(
// roleInfo,
// credential.getSource(),
// dbName,
// collectionName,
// unsupportedActions);
// }
//
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
// }
// if (unsupportedActions.isEmpty()) {
// return;
// }
// }
//
// String missingPermissions = String.join(", ", unsupportedActions);
// getConfigByName(config, configName)
// .ifPresent(
// c -> c.addErrorMessage(
// format(
// "Invalid user permissions. Missing the following action permissions: %s",
// missingPermissions)));
// }
// catch (MongoSecurityException e) {
// getConfigByName(config, configName)
// .ifPresent(c -> c.addErrorMessage("Invalid user permissions authentication failed."));
// }
// catch (Exception e) {
// LOGGER.warn("Permission validation failed due to: {}", e.getMessage(), e);
// }
// }
/**
* Checks the roles info document for matching actions and removes them from the provided list
*/
// private static List<String> removeUserActions(final Document rolesInfo, final String authDatabase, final String databaseName, final String collectionName,
// final List<String> userActions) {
// List<Document> privileges = rolesInfo.getList("inheritedPrivileges", Document.class, emptyList());
// if (privileges.isEmpty() || userActions.isEmpty()) {
// return userActions;
// }
//
// List<String> unsupportedUserActions = new ArrayList<>(userActions);
// for (final Document privilege : privileges) {
// Document resource = privilege.get("resource", new Document());
// if (resource.containsKey("cluster") && resource.getBoolean("cluster")) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// else if (resource.containsKey("db") && resource.containsKey("collection")) {
// String database = resource.getString("db");
// String collection = resource.getString("collection");
//
// boolean resourceMatches = false;
// boolean collectionMatches = collection.isEmpty() || collection.equals(collectionName);
// if (database.isEmpty() && collectionMatches) {
// resourceMatches = true;
// }
// else if (database.equals(authDatabase) && collection.isEmpty()) {
// resourceMatches = true;
// }
// else if (database.equals(databaseName) && collectionMatches) {
// resourceMatches = true;
// }
//
// if (resourceMatches) {
// unsupportedUserActions.removeAll(privilege.getList("actions", String.class, emptyList()));
// }
// }
//
// if (unsupportedUserActions.isEmpty()) {
// break;
// }
// }
//
// return unsupportedUserActions;
// }
public static Optional<ConfigValue> getConfigByName(final Config config, final String name) {
for (final ConfigValue configValue : config.configValues()) {
if (configValue.name().equals(name)) {
return Optional.of(configValue);
}
}
return Optional.empty();
}
private SinkConnection() {
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import static io.debezium.connector.mongodb.sink.MongoDbSinkConnectorTask.LOGGER;
import java.util.Collection;
import java.util.List;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import io.debezium.DebeziumException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoNamespace;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import io.debezium.dlq.ErrorReporter;
final class StartedMongoDbSinkTask implements AutoCloseable {
private final MongoDbSinkConnectorConfig sinkConfig;
private final MongoClient mongoClient;
private final ErrorReporter errorReporter;
StartedMongoDbSinkTask(
final MongoDbSinkConnectorConfig sinkConfig,
final MongoClient mongoClient,
final ErrorReporter errorReporter) {
this.sinkConfig = sinkConfig;
this.mongoClient = mongoClient;
this.errorReporter = errorReporter;
}
@SuppressWarnings("try")
@Override
public void close() {
try (MongoClient autoCloseable = mongoClient) {
// just using try-with-resources to ensure they all get closed, even in the case of
// exceptions
}
}
void put(final Collection<SinkRecord> records) {
try {
trackLatestRecordTimestampOffset(records);
if (records.isEmpty()) {
LOGGER.debug("No sink records to process for current poll operation");
}
else {
List<List<MongoProcessedSinkRecordData>> batches = MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(
records, sinkConfig, errorReporter);
for (List<MongoProcessedSinkRecordData> batch : batches) {
bulkWriteBatch(batch);
}
}
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
private void trackLatestRecordTimestampOffset(final Collection<SinkRecord> records) {
OptionalLong latestRecord = records.stream()
.filter(v -> v.timestamp() != null)
.mapToLong(ConnectRecord::timestamp)
.max();
}
private void bulkWriteBatch(final List<MongoProcessedSinkRecordData> batch) {
if (batch.isEmpty()) {
return;
}
MongoNamespace namespace = batch.get(0).getNamespace();
List<WriteModel<BsonDocument>> writeModels = batch.stream()
.map(MongoProcessedSinkRecordData::getWriteModel)
.collect(Collectors.toList());
boolean bulkWriteOrdered = true;
try {
LOGGER.debug(
"Bulk writing {} document(s) into collection [{}] via an {} bulk write",
writeModels.size(),
namespace.getFullName(),
bulkWriteOrdered ? "ordered" : "unordered");
BulkWriteResult result = mongoClient
.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName(), BsonDocument.class)
.bulkWrite(writeModels, new BulkWriteOptions().ordered(bulkWriteOrdered));
LOGGER.debug("Mongodb bulk write result: {}", result);
}
catch (RuntimeException e) {
handleTolerableWriteException(
batch.stream()
.map(MongoProcessedSinkRecordData::getSinkRecord)
.collect(Collectors.toList()),
bulkWriteOrdered,
e,
true,
true);
}
}
private void handleTolerableWriteException(
final List<SinkRecord> batch,
final boolean ordered,
final RuntimeException e,
final boolean logErrors,
final boolean tolerateErrors) {
if (e instanceof MongoBulkWriteException) {
throw new DataException(e);
}
else {
if (logErrors) {
log(batch, e);
}
if (tolerateErrors) {
batch.forEach(record -> errorReporter.report(record, e));
}
else {
throw new DataException(e);
}
}
}
private static void log(final Collection<SinkRecord> records, final RuntimeException e) {
LOGGER.error("Failed to put into the sink the following records: {}", records, e);
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink;
import org.bson.BsonDocument;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
public interface WriteModelStrategy {
WriteModel<BsonDocument> createWriteModel(SinkDocument document);
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import org.bson.RawBsonDocument;
import org.bson.codecs.BsonDocumentCodec;
class ByteArrayValueConverter implements SinkValueConverter {
private static final BsonDocumentCodec BSON_DOCUMENT_CODEC = new BsonDocumentCodec();
@Override
public BsonDocument convert(final Schema schema, final Object value) {
if (value == null) {
throw new DataException("BSON conversion failed due to record key and/or value was null");
}
return new RawBsonDocument((byte[]) value).decode(BSON_DOCUMENT_CODEC);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
class JsonStringValueConverter implements SinkValueConverter {
@Override
public BsonDocument convert(final Schema schema, final Object value) {
if (value == null) {
throw new DataException("JSON string conversion failed due to record key and/or value was null");
}
return BsonDocument.parse((String) value);
}
}

View File

@ -0,0 +1,204 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import static java.lang.String.format;
import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import com.mongodb.lang.Nullable;
/**
* A lazy representation of a MongoDB document.
*/
public class LazyBsonDocument extends BsonDocument {
private static final long serialVersionUID = 1L;
private final transient SinkRecord record;
private final transient Type dataType;
private final transient BiFunction<Schema, Object, BsonDocument> converter;
private BsonDocument unwrapped;
public enum Type {
KEY,
VALUE
}
/**
* Construct a new instance with the given supplier of the document.
*
* @param record the sink record to convert
* @param converter the converter for the sink record
*/
public LazyBsonDocument(
final SinkRecord record,
final Type dataType,
final BiFunction<Schema, Object, BsonDocument> converter) {
if (record == null) {
throw new IllegalArgumentException("record can not be null");
}
else if (dataType == null) {
throw new IllegalArgumentException("dataType can not be null");
}
else if (converter == null) {
throw new IllegalArgumentException("converter can not be null");
}
this.record = record;
this.dataType = dataType;
this.converter = converter;
}
@Override
public int size() {
return getUnwrapped().size();
}
@Override
public boolean isEmpty() {
return getUnwrapped().isEmpty();
}
@Override
public boolean containsKey(final Object key) {
return getUnwrapped().containsKey(key);
}
@Override
public boolean containsValue(final Object value) {
return getUnwrapped().containsValue(value);
}
@Override
public BsonValue get(final Object key) {
return getUnwrapped().get(key);
}
@Override
public BsonValue put(final String key, final BsonValue value) {
return getUnwrapped().put(key, value);
}
@Override
public BsonValue remove(final Object key) {
return getUnwrapped().remove(key);
}
@Override
public void putAll(final Map<? extends String, ? extends BsonValue> m) {
getUnwrapped().putAll(m);
}
@Override
public void clear() {
getUnwrapped().clear();
}
@Override
public Set<String> keySet() {
return getUnwrapped().keySet();
}
@Override
public Collection<BsonValue> values() {
return getUnwrapped().values();
}
@Override
public Set<Entry<String, BsonValue>> entrySet() {
return getUnwrapped().entrySet();
}
@Override
public boolean equals(final Object o) {
return getUnwrapped().equals(o);
}
@Override
public int hashCode() {
return getUnwrapped().hashCode();
}
@Override
public String toString() {
return getUnwrapped().toString();
}
@Override
public BsonDocument clone() {
return unwrapped != null
? unwrapped.clone()
: new LazyBsonDocument(record, dataType, converter);
}
private BsonDocument getUnwrapped() {
if (unwrapped == null) {
switch (dataType) {
case KEY:
try {
unwrapped = converter.apply(record.keySchema(), record.key());
}
catch (Exception e) {
throw new DataException(
format(
"Could not convert key %s into a BsonDocument.",
unambiguousToString(record.key())),
e);
}
break;
case VALUE:
try {
unwrapped = converter.apply(record.valueSchema(), record.value());
}
catch (Exception e) {
throw new DataException(
format(
"Could not convert value %s into a BsonDocument.",
unambiguousToString(record.value())),
e);
}
break;
default:
throw new DataException(format("Unknown data type %s.", dataType));
}
}
return unwrapped;
}
// see https://docs.oracle.com/javase/6/docs/platform/serialization/spec/output.html
private Object writeReplace() {
return getUnwrapped();
}
// see https://docs.oracle.com/javase/6/docs/platform/serialization/spec/input.html
private void readObject(final ObjectInputStream stream) throws InvalidObjectException {
throw new InvalidObjectException("Proxy required");
}
private static String unambiguousToString(@Nullable final Object v) {
String stringValue = String.valueOf(v);
if (v == null) {
return format("'%s' (null reference)", stringValue);
}
else if (stringValue.equals(String.valueOf((Object) null))) {
return format("'%s' (%s, not a null reference)", stringValue, v.getClass().getName());
}
else {
return format("'%s' (%s)", stringValue, v.getClass().getName());
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import org.bson.Document;
import com.mongodb.MongoClientSettings;
/** Used for converting Maps e.g. schema-less JSON */
class MapValueConverter implements SinkValueConverter {
@SuppressWarnings({ "unchecked" })
@Override
public BsonDocument convert(final Schema schema, final Object value) {
if (value == null) {
throw new DataException("JSON conversion failed due to record key and/or value was null");
}
return new Document((Map<String, Object>) value)
.toBsonDocument(Document.class, MongoClientSettings.getDefaultCodecRegistry());
}
}

View File

@ -0,0 +1,222 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableSet;
import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
import static org.apache.kafka.connect.data.Schema.Type.MAP;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.mongodb.sink.converters.bson.AbstractBsonType;
import io.debezium.connector.mongodb.sink.converters.bson.BooleanType;
import io.debezium.connector.mongodb.sink.converters.bson.BytesType;
import io.debezium.connector.mongodb.sink.converters.bson.DateType;
import io.debezium.connector.mongodb.sink.converters.bson.DecimalType;
import io.debezium.connector.mongodb.sink.converters.bson.Float32Type;
import io.debezium.connector.mongodb.sink.converters.bson.Float64Type;
import io.debezium.connector.mongodb.sink.converters.bson.Int16Type;
import io.debezium.connector.mongodb.sink.converters.bson.Int32Type;
import io.debezium.connector.mongodb.sink.converters.bson.Int64Type;
import io.debezium.connector.mongodb.sink.converters.bson.Int8Type;
import io.debezium.connector.mongodb.sink.converters.bson.StringType;
import io.debezium.connector.mongodb.sink.converters.bson.TimeType;
import io.debezium.connector.mongodb.sink.converters.bson.TimestampType;
/** Used for converting Struct based data with schema, like AVRO, Protobuf or JSON */
class SchemaValueConverter implements SinkValueConverter {
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaValueConverter.class);
private static final Set<String> LOGICAL_TYPE_NAMES = unmodifiableSet(
new HashSet<>(
asList(
Date.LOGICAL_NAME,
Decimal.LOGICAL_NAME,
Time.LOGICAL_NAME,
Timestamp.LOGICAL_NAME)));
private final Map<Schema.Type, AbstractBsonType> schemaTypeToBsonType = new HashMap<>();
private final Map<String, AbstractBsonType> logicalSchemaTypeToBsonType = new HashMap<>();
SchemaValueConverter() {
registerType(new BooleanType());
registerType(new BytesType());
registerType(new Float32Type());
registerType(new Float64Type());
registerType(new Int8Type());
registerType(new Int16Type());
registerType(new Int32Type());
registerType(new Int64Type());
registerType(new StringType());
registerLogicalType(new DateType());
registerLogicalType(new DecimalType());
registerLogicalType(new TimeType());
registerLogicalType(new TimestampType());
}
@Override
public BsonDocument convert(final Schema schema, final Object value) {
if (schema == null || value == null) {
throw new DataException("Schema-ed conversion failed due to record key and/or value was null");
}
return toBsonDoc(schema, value).asDocument();
}
private void registerType(final AbstractBsonType bsonType) {
schemaTypeToBsonType.put(bsonType.getSchema().type(), bsonType);
}
private void registerLogicalType(final AbstractBsonType bsonType) {
logicalSchemaTypeToBsonType.put(bsonType.getSchema().name(), bsonType);
}
private BsonValue toBsonDoc(final Schema schema, final Object value) {
if (value == null) {
return BsonNull.VALUE;
}
BsonDocument doc = new BsonDocument();
if (schema.type() == MAP) {
Schema fieldSchema = schema.valueSchema();
Map m = (Map) value;
for (Object entry : m.keySet()) {
String key = (String) entry;
if (fieldSchema.type().isPrimitive()) {
doc.put(key, getBsonType(fieldSchema).toBson(m.get(key), fieldSchema));
}
else if (fieldSchema.type().equals(ARRAY)) {
doc.put(key, toBsonArray(fieldSchema, m.get(key)));
}
else {
if (m.get(key) == null) {
doc.put(key, BsonNull.VALUE);
}
else {
doc.put(key, toBsonDoc(fieldSchema, m.get(key)));
}
}
}
}
else {
schema.fields().forEach(f -> doc.put(f.name(), processField((Struct) value, f)));
}
return doc;
}
private BsonValue toBsonArray(final Schema schema, final Object value) {
if (value == null) {
return BsonNull.VALUE;
}
Schema fieldSchema = schema.valueSchema();
BsonArray bsonArray = new BsonArray();
List<?> myList = (List) value;
myList.forEach(
v -> {
if (fieldSchema.type().isPrimitive()) {
if (v == null) {
bsonArray.add(BsonNull.VALUE);
}
else {
bsonArray.add(getBsonType(fieldSchema).toBson(v));
}
}
else if (fieldSchema.type().equals(ARRAY)) {
bsonArray.add(toBsonArray(fieldSchema, v));
}
else {
bsonArray.add(toBsonDoc(fieldSchema, v));
}
});
return bsonArray;
}
private BsonValue processField(final Struct struct, final Field field) {
LOGGER.trace("processing field '{}'", field.name());
if (struct.get(field.name()) == null) {
LOGGER.trace("no field in struct -> adding null");
return BsonNull.VALUE;
}
if (isSupportedLogicalType(field.schema())) {
return getBsonType(field.schema()).toBson(struct.get(field), field.schema());
}
try {
switch (field.schema().type()) {
case BOOLEAN:
case FLOAT32:
case FLOAT64:
case INT8:
case INT16:
case INT32:
case INT64:
case STRING:
case BYTES:
return handlePrimitiveField(struct, field);
case STRUCT:
case MAP:
return toBsonDoc(field.schema(), struct.get(field));
case ARRAY:
return toBsonArray(field.schema(), struct.get(field));
default:
throw new DataException("Unexpected / unsupported schema type " + field.schema().type());
}
}
catch (Exception e) {
throw new DataException("Error while processing field " + field.name(), e);
}
}
private BsonValue handlePrimitiveField(final Struct struct, final Field field) {
LOGGER.trace("handling primitive type '{}'", field.schema().type());
return getBsonType(field.schema()).toBson(struct.get(field), field.schema());
}
private boolean isSupportedLogicalType(final Schema schema) {
if (schema.name() == null) {
return false;
}
return LOGICAL_TYPE_NAMES.contains(schema.name());
}
private AbstractBsonType getBsonType(final Schema schema) {
AbstractBsonType bsonType;
if (isSupportedLogicalType(schema)) {
bsonType = logicalSchemaTypeToBsonType.get(schema.name());
}
else {
bsonType = schemaTypeToBsonType.get(schema.type());
}
if (bsonType == null) {
throw new ConnectException(
"error no registered bsonType found for " + schema.type().getName());
}
return bsonType;
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import java.util.Optional;
import org.bson.BsonDocument;
public final class SinkDocument implements Cloneable {
private final BsonDocument keyDoc;
private final BsonDocument valueDoc;
public SinkDocument(final BsonDocument keyDoc, final BsonDocument valueDoc) {
this.keyDoc = keyDoc;
this.valueDoc = valueDoc;
}
public Optional<BsonDocument> getKeyDoc() {
return Optional.ofNullable(keyDoc);
}
public Optional<BsonDocument> getValueDoc() {
return Optional.ofNullable(valueDoc);
}
@Override
public SinkDocument clone() {
return new SinkDocument(keyDoc != null ? keyDoc.clone() : null, valueDoc != null ? valueDoc.clone() : null);
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.mongodb.sink.converters.LazyBsonDocument.Type;
/**
* Converts a Kafka Connect record into one BsonDocument for the record key and one for the record value, choosing the correct SinkValueConverter based on the record type.
*/
public class SinkRecordConverter {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordConverter.class);
private static final SinkValueConverter SCHEMA_VALUE_CONVERTER = new SchemaValueConverter();
private static final SinkValueConverter MAP_VALUE_CONVERTER = new MapValueConverter();
private static final SinkValueConverter JSON_STRING_VALUE_CONVERTER = new JsonStringValueConverter();
private static final SinkValueConverter BYTE_ARRAY_VALUE_CONVERTER = new ByteArrayValueConverter();
public SinkDocument convert(final SinkRecord record) {
LOGGER.trace("record: {}", record);
BsonDocument keyDoc = null;
if (record.key() != null) {
keyDoc = new LazyBsonDocument(
record,
Type.KEY,
(schema, data) -> getConverter(schema, data).convert(schema, data));
}
BsonDocument valueDoc = null;
if (record.value() != null) {
valueDoc = new LazyBsonDocument(
record,
Type.VALUE,
(Schema schema, Object data) -> getConverter(schema, data).convert(schema, data));
}
return new SinkDocument(keyDoc, valueDoc);
}
private SinkValueConverter getConverter(final Schema schema, final Object data) {
if (schema != null && data instanceof Struct) {
LOGGER.debug("Using schema-ed converter");
return SCHEMA_VALUE_CONVERTER;
}
// structured JSON without schema
if (data instanceof Map) {
LOGGER.debug("Using schemaless / map converter");
return MAP_VALUE_CONVERTER;
}
// JSON string
if (data instanceof String) {
LOGGER.debug("Using JSON string converter");
return JSON_STRING_VALUE_CONVERTER;
}
// BSON bytes
if (data instanceof byte[]) {
LOGGER.debug("Using BSON converter");
return BYTE_ARRAY_VALUE_CONVERTER;
}
throw new DataException("No converter found for unexpected object type: " + data.getClass().getName());
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonDocument;
/**
* Interface to convert a value (from a Kafka Connect record) into a BsonDocument representing the value.
*/
public interface SinkValueConverter {
BsonDocument convert(Schema schema, Object value);
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonNull;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractBsonType {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBsonType.class);
private final Schema schema;
public AbstractBsonType(final Schema schema) {
this.schema = schema;
}
public Schema getSchema() {
return schema;
}
public abstract BsonValue toBson(Object data);
public BsonValue toBson(final Object data, final Schema fieldSchema) {
if (!fieldSchema.isOptional()) {
if (data == null) {
throw new DataException("Schema of field \"" + fieldSchema.name() + "\" is not defined as optional but value is null");
}
LOGGER.trace("Non-optional field \"{}\" with value \"{}\"", fieldSchema.name(), data);
return toBson(data);
}
if (data != null) {
LOGGER.trace("Optional field \"{}\" with value \"{}\"", fieldSchema.name(), data);
return toBson(data);
}
if (fieldSchema.defaultValue() != null) {
LOGGER.trace("Optional field \"{}\" with no data / null value but default value is \"{}\"", fieldSchema.name(), fieldSchema.defaultValue());
return toBson(fieldSchema.defaultValue());
}
LOGGER.trace("Optional field \"{}\" with no data / null value and no default value thus value set to \"{}\"", fieldSchema.name(), BsonNull.VALUE);
return BsonNull.VALUE;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonBoolean;
import org.bson.BsonValue;
public class BooleanType extends AbstractBsonType {
public BooleanType() {
super(Schema.BOOLEAN_SCHEMA);
}
public BsonValue toBson(final Object data) {
return new BsonBoolean((Boolean) data);
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import java.nio.ByteBuffer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonBinary;
import org.bson.BsonValue;
public class BytesType extends AbstractBsonType {
public BytesType() {
super(Schema.BYTES_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
if (data instanceof ByteBuffer) {
return new BsonBinary(((ByteBuffer) data).array());
}
if (data instanceof byte[]) {
return new BsonBinary((byte[]) data);
}
throw new DataException("Bytes field conversion failed due to unexpected object type " + data.getClass().getName());
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Date;
import org.bson.BsonDateTime;
import org.bson.BsonValue;
public class DateType extends AbstractBsonType {
public DateType() {
super(Date.SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonDateTime(((java.util.Date) data).getTime());
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import java.math.BigDecimal;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDecimal128;
import org.bson.BsonDouble;
import org.bson.BsonValue;
import org.bson.types.Decimal128;
public class DecimalType extends AbstractBsonType {
public enum Format {
DECIMAL128, // for MongoDB v3.4+
LEGACYDOUBLE // results in double approximation
}
private final Format format;
public DecimalType() {
super(Decimal.schema(0));
this.format = Format.DECIMAL128;
}
public DecimalType(final Format format) {
super(Decimal.schema(0));
this.format = format;
}
@Override
public BsonValue toBson(final Object data) {
if (data instanceof BigDecimal) {
if (format.equals(Format.DECIMAL128)) {
return new BsonDecimal128(new Decimal128((BigDecimal) data));
}
if (format.equals(Format.LEGACYDOUBLE)) {
return new BsonDouble(((BigDecimal) data).doubleValue());
}
}
throw new DataException(
"Decimal conversion not possible when data is of type " + data.getClass().getName()
+ " and format is " + format);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonDouble;
import org.bson.BsonValue;
public class Float32Type extends AbstractBsonType {
public Float32Type() {
super(Schema.FLOAT32_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonDouble((Float) data);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonDouble;
import org.bson.BsonValue;
public class Float64Type extends AbstractBsonType {
public Float64Type() {
super(Schema.FLOAT64_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonDouble((Double) data);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonInt32;
import org.bson.BsonValue;
public class Int16Type extends AbstractBsonType {
public Int16Type() {
super(Schema.INT16_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonInt32(((Short) data).intValue());
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonInt32;
import org.bson.BsonValue;
public class Int32Type extends AbstractBsonType {
public Int32Type() {
super(Schema.INT32_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonInt32((Integer) data);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonInt64;
import org.bson.BsonValue;
public class Int64Type extends AbstractBsonType {
public Int64Type() {
super(Schema.INT64_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonInt64((Long) data);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonInt32;
import org.bson.BsonValue;
public class Int8Type extends AbstractBsonType {
public Int8Type() {
super(Schema.INT8_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonInt32(((Byte) data).intValue());
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Schema;
import org.bson.BsonString;
import org.bson.BsonValue;
public class StringType extends AbstractBsonType {
public StringType() {
super(Schema.STRING_SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonString((String) data);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Time;
import org.bson.BsonDateTime;
import org.bson.BsonValue;
public class TimeType extends AbstractBsonType {
public TimeType() {
super(Time.SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonDateTime(((java.util.Date) data).getTime());
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.converters.bson;
import org.apache.kafka.connect.data.Timestamp;
import org.bson.BsonDateTime;
import org.bson.BsonValue;
public class TimestampType extends AbstractBsonType {
public TimestampType() {
super(Timestamp.SCHEMA);
}
@Override
public BsonValue toBson(final Object data) {
return new BsonDateTime(((java.util.Date) data).getTime());
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.eventhandler;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.data.Envelope.Operation;
public abstract class EventHandler {
private static final String OPERATION_TYPE_FIELD_PATH = "op";
private static final String DDL_FIELD_PATH = "ddl";
private static final EventOperation NOOP_CDC_OPERATION = (doc, columnNamingStrategy) -> null;
private final MongoDbSinkConnectorConfig config;
private final Map<Operation, EventOperation> operations = new HashMap<>();
public EventHandler(final MongoDbSinkConnectorConfig config) {
this.config = config;
}
public MongoDbSinkConnectorConfig getConfig() {
return config;
}
public abstract Optional<WriteModel<BsonDocument>> handle(SinkDocument doc);
protected void registerOperations(final Map<Operation, EventOperation> operations) {
this.operations.putAll(operations);
}
public EventOperation getEventOperation(final BsonDocument doc) {
try {
if (!doc.containsKey(OPERATION_TYPE_FIELD_PATH) && doc.containsKey(DDL_FIELD_PATH)) {
return NOOP_CDC_OPERATION;
}
if (!doc.containsKey(OPERATION_TYPE_FIELD_PATH)
|| !doc.get(OPERATION_TYPE_FIELD_PATH).isString()) {
throw new DataException("Value document is missing or CDC operation is not a string");
}
EventOperation op = operations.get(
Operation.forCode(doc.get(OPERATION_TYPE_FIELD_PATH).asString().getValue()));
if (op == null) {
throw new DataException(
"No CDC operation found in mapping for op="
+ doc.get(OPERATION_TYPE_FIELD_PATH).asString().getValue());
}
return op;
}
catch (IllegalArgumentException exc) {
throw new DataException("Parsing CDC operation failed", exc);
}
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.eventhandler;
import org.bson.BsonDocument;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.table.ColumnNamingStrategy;
public interface EventOperation {
WriteModel<BsonDocument> perform(SinkDocument doc, ColumnNamingStrategy columnNamingStrategy);
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.eventhandler.relational;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.connector.mongodb.sink.eventhandler.EventOperation;
import io.debezium.data.Envelope.Operation;
import io.debezium.table.ColumnNamingStrategy;
public class RelationalDeleteEvent implements EventOperation {
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc, ColumnNamingStrategy columnNamingStrategy) {
BsonDocument keyDoc = doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Key document must not be missing for delete operation"));
BsonDocument valueDoc = doc.getValueDoc()
.orElseThrow(
() -> new DataException("Value document must not be missing for delete operation"));
try {
BsonDocument filterDoc = RelationalEventHandler.generateFilterDoc(keyDoc, valueDoc, Operation.DELETE, columnNamingStrategy);
return new DeleteOneModel<>(filterDoc);
}
catch (Exception exc) {
throw new DataException(exc);
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.eventhandler.relational;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.bson.BsonObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.connector.mongodb.sink.eventhandler.EventHandler;
import io.debezium.connector.mongodb.sink.eventhandler.EventOperation;
import io.debezium.data.Envelope.Operation;
import io.debezium.table.ColumnNamingStrategy;
public class RelationalEventHandler extends EventHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalEventHandler.class);
private static final String ID_FIELD = "_id";
private static final String JSON_DOC_BEFORE_FIELD = "before";
private static final String JSON_DOC_AFTER_FIELD = "after";
private static final Map<Operation, EventOperation> DEFAULT_OPERATIONS = new HashMap<>() {
{
put(Operation.CREATE, new RelationalInsertEvent());
put(Operation.READ, new RelationalInsertEvent());
put(Operation.UPDATE, new RelationalUpdateEvent());
put(Operation.DELETE, new RelationalDeleteEvent());
}
};
public RelationalEventHandler(final MongoDbSinkConnectorConfig config) {
this(config, DEFAULT_OPERATIONS);
}
public RelationalEventHandler(final MongoDbSinkConnectorConfig config, final Map<Operation, EventOperation> operations) {
super(config);
registerOperations(operations);
}
@Override
public Optional<WriteModel<BsonDocument>> handle(final SinkDocument doc) {
BsonDocument keyDoc = doc.getKeyDoc().orElseGet(BsonDocument::new);
BsonDocument valueDoc = doc.getValueDoc().orElseGet(BsonDocument::new);
if (valueDoc.isEmpty()) {
LOGGER.debug("Skipping debezium tombstone event for kafka topic compaction");
return Optional.empty();
}
return Optional.ofNullable(getEventOperation(valueDoc).perform(new SinkDocument(keyDoc, valueDoc), getConfig().getColumnNamingStrategy()));
}
static BsonDocument generateFilterDoc(final BsonDocument keyDoc, final BsonDocument valueDoc, final Operation opType, ColumnNamingStrategy columnNamingStrategy) {
if (keyDoc.keySet().isEmpty()) {
if (opType.equals(Operation.CREATE) || opType.equals(Operation.READ)) {
// create: no PK info in keyDoc -> generate ObjectId
return new BsonDocument(ID_FIELD, new BsonObjectId());
}
// update or delete: no PK info in keyDoc -> take everything in 'before' field
try {
BsonDocument filter = valueDoc.getDocument(JSON_DOC_BEFORE_FIELD);
if (filter.isEmpty()) {
throw new BsonInvalidOperationException("value doc before field is empty");
}
return filter;
}
catch (BsonInvalidOperationException exc) {
throw new DataException(
"Value doc 'before' field is empty or has invalid type"
+ " for update/delete operation. -> defensive actions taken!",
exc);
}
}
// build filter document composed of all PK columns
BsonDocument pk = new BsonDocument();
for (String f : keyDoc.keySet()) {
pk.put(columnNamingStrategy.resolveColumnName(f), keyDoc.get(f));
}
return new BsonDocument(ID_FIELD, pk);
}
static BsonDocument generateUpsertOrReplaceDoc(final BsonDocument keyDoc, final BsonDocument valueDoc, final BsonDocument filterDoc,
ColumnNamingStrategy columnNamingStrategy) {
if (!valueDoc.containsKey(JSON_DOC_AFTER_FIELD)
|| valueDoc.get(JSON_DOC_AFTER_FIELD).isNull()
|| !valueDoc.get(JSON_DOC_AFTER_FIELD).isDocument()
|| valueDoc.getDocument(JSON_DOC_AFTER_FIELD).isEmpty()) {
throw new DataException(
"Value document must contain non-empty 'after' field"
+ " of type document for insert/update operation");
}
BsonDocument upsertDoc = new BsonDocument();
if (filterDoc.containsKey(ID_FIELD)) {
upsertDoc.put(ID_FIELD, filterDoc.get(ID_FIELD));
}
BsonDocument afterDoc = valueDoc.getDocument(JSON_DOC_AFTER_FIELD);
for (String f : afterDoc.keySet()) {
if (!keyDoc.containsKey(f)) {
upsertDoc.put(columnNamingStrategy.resolveColumnName(f), afterDoc.get(f));
}
}
return upsertDoc;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.eventhandler.relational;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.connector.mongodb.sink.eventhandler.EventOperation;
import io.debezium.data.Envelope.Operation;
import io.debezium.table.ColumnNamingStrategy;
public class RelationalInsertEvent implements EventOperation {
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc, ColumnNamingStrategy columnNamingStrategy) {
BsonDocument keyDoc = doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Key document must not be missing for insert operation"));
BsonDocument valueDoc = doc.getValueDoc()
.orElseThrow(
() -> new DataException("Value document must not be missing for insert operation"));
try {
BsonDocument filterDoc = RelationalEventHandler.generateFilterDoc(keyDoc, valueDoc, Operation.CREATE, columnNamingStrategy);
BsonDocument upsertDoc = RelationalEventHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc, columnNamingStrategy);
return new ReplaceOneModel<>(filterDoc, upsertDoc, REPLACE_OPTIONS);
}
catch (Exception exc) {
throw new DataException(exc);
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.sink.eventhandler.relational;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonDocument;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.WriteModel;
import io.debezium.connector.mongodb.sink.converters.SinkDocument;
import io.debezium.connector.mongodb.sink.eventhandler.EventOperation;
import io.debezium.data.Envelope.Operation;
import io.debezium.table.ColumnNamingStrategy;
public class RelationalUpdateEvent implements EventOperation {
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
@Override
public WriteModel<BsonDocument> perform(final SinkDocument doc, ColumnNamingStrategy columnNamingStrategy) {
BsonDocument keyDoc = doc.getKeyDoc()
.orElseThrow(
() -> new DataException("Key document must not be missing for update operation"));
BsonDocument valueDoc = doc.getValueDoc()
.orElseThrow(
() -> new DataException("Value document must not be missing for update operation"));
try {
BsonDocument filterDoc = RelationalEventHandler.generateFilterDoc(keyDoc, valueDoc, Operation.UPDATE, columnNamingStrategy);
BsonDocument replaceDoc = RelationalEventHandler.generateUpsertOrReplaceDoc(keyDoc, valueDoc, filterDoc, columnNamingStrategy);
return new ReplaceOneModel<>(filterDoc, replaceDoc, REPLACE_OPTIONS);
}
catch (Exception exc) {
throw new DataException(exc);
}
}
}

View File

@ -0,0 +1,11 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector;
public interface SinkConnectorConfig {
String getTableNameFormat();
}

View File

@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.dlq;
import org.apache.kafka.connect.sink.SinkRecord;
public interface ErrorReporter {
void report(SinkRecord record, Exception e);
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.table;
/**
* A pluggable strategy contract for defining how column names are resolved from kafka fields.
*
* @author Chris Cranford
*/
public interface ColumnNamingStrategy {
/**
* Resolves the logical field name from the change event to a column name.
*
* @param fieldName the field name, should not be {@code null}.
* @return the resolved logical column name, never {@code null}.
*/
String resolveColumnName(String fieldName);
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.table;
/**
* The default implementation of the {@link ColumnNamingStrategy} that simply returns the field's
* name as the column name in the destination table.
*
* @author Chris Cranford
*/
public class DefaultColumnNamingStrategy implements ColumnNamingStrategy {
@Override
public String resolveColumnName(String fieldName) {
// Default behavior is a no-op
return fieldName;
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.table;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.SinkConnectorConfig;
import io.debezium.data.Envelope;
/**
* Default implementation of the {@link TableNamingStrategy} where the table name is driven
* directly from the topic name, replacing any {@code dot} characters with {@code underscore}
* and source field in topic.
*
* @author Chris Cranford
*/
public class DefaultTableNamingStrategy implements TableNamingStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTableNamingStrategy.class);
private final Pattern sourcePattern = Pattern.compile("\\$\\{(source\\.)(.*?)}");
@Override
public String resolveTableName(SinkConnectorConfig config, SinkRecord record) {
// Default behavior is to replace dots with underscores
final String topicName = record.topic().replace(".", "_");
String table = config.getTableNameFormat().replace("${topic}", topicName);
table = resolveTableNameBySource(config, record, table);
return table;
}
private String resolveTableNameBySource(SinkConnectorConfig config, SinkRecord record, String tableFormat) {
String table = tableFormat;
if (table.contains("${source.")) {
if (isTombstone(record)) {
LOGGER.warn(
"Ignore this record because it seems to be a tombstone that doesn't have source field, then cannot resolve table name in topic '{}', partition '{}', offset '{}'",
record.topic(), record.kafkaPartition(), record.kafkaOffset());
return null;
}
try {
Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);
Matcher matcher = sourcePattern.matcher(table);
while (matcher.find()) {
String target = matcher.group();
table = table.replace(target, source.getString(matcher.group(2)));
}
}
catch (DataException e) {
LOGGER.error("Failed to resolve table name with format '{}', check source field in topic '{}'", config.getTableNameFormat(), record.topic(), e);
throw e;
}
}
return table;
}
private boolean isTombstone(SinkRecord record) {
return record.value() == null;
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.table;
import io.debezium.util.Strings;
/**
* A generalized {@link FieldFilterFactory} implementation.
*
* @author Anisha Mohanty
*/
public class FieldFilterFactory {
@FunctionalInterface
public interface FieldNameFilter {
boolean matches(String topicName, String columnName);
}
/** Default filter that always includes a field */
public static FieldNameFilter DEFAULT_FILTER = (topicName, columnName) -> true;
private static FieldNameFilter createFilter(String fieldList, boolean include) {
String[] entries = fieldList.split(",");
return (topicName, fieldName) -> {
for (String entry : entries) {
String[] parts = entry.split(":");
if (parts.length == 2 && parts[0].equals(topicName) && parts[1].equals(fieldName)) {
return include;
}
}
return !include;
};
}
private static FieldNameFilter createIncludeFilter(String fieldIncludeList) {
return createFilter(fieldIncludeList, true);
}
private static FieldNameFilter createExcludeFilter(String fieldExcludeList) {
return createFilter(fieldExcludeList, false);
}
public static FieldNameFilter createFieldFilter(String includeList, String excludeList) {
if (!Strings.isNullOrEmpty(excludeList)) {
return createExcludeFilter(excludeList);
}
else if (!Strings.isNullOrEmpty(includeList)) {
return createIncludeFilter(includeList);
}
else {
// Always match and include as no filters were specified.
return DEFAULT_FILTER;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.table;
import org.apache.kafka.connect.sink.SinkRecord;
import io.debezium.connector.SinkConnectorConfig;
/**
* A pluggable strategy contract for defining how table names are resolved from kafka records.
*
* @author Chris Cranford
*/
public interface TableNamingStrategy {
/**
* Resolves the logical table name from the sink record.
*
* @param config sink connector configuration, should not be {@code null}
* @param record Kafka sink record, should not be {@code null}
* @return the resolved logical table name; if {@code null} the record should not be processed
*/
String resolveTableName(SinkConnectorConfig config, SinkRecord record);
}