DBZ-7996 Move ChangeEventSink class to debezium-core and use the interface for the MongoDB sink connector

relates to https://issues.redhat.com/browse/DBZ-7996
This commit is contained in:
rkerner 2024-09-04 18:35:04 +02:00 committed by Jiri Pechanec
parent ffa58f3a94
commit bfb6938db8
5 changed files with 13 additions and 13 deletions

View File

@ -30,7 +30,7 @@
import io.debezium.connector.jdbc.naming.TableNamingStrategy; import io.debezium.connector.jdbc.naming.TableNamingStrategy;
import io.debezium.connector.jdbc.relational.TableDescriptor; import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.connector.jdbc.relational.TableId; import io.debezium.connector.jdbc.relational.TableId;
import io.debezium.pipeline.sink.spi.ChangeEventSink; import io.debezium.pipeline.spi.ChangeEventSink;
import io.debezium.util.Stopwatch; import io.debezium.util.Stopwatch;
import io.debezium.util.Strings; import io.debezium.util.Strings;
@ -52,7 +52,6 @@ public class JdbcChangeEventSink implements ChangeEventSink {
private final RecordWriter recordWriter; private final RecordWriter recordWriter;
public JdbcChangeEventSink(JdbcSinkConnectorConfig config, StatelessSession session, DatabaseDialect dialect, RecordWriter recordWriter) { public JdbcChangeEventSink(JdbcSinkConnectorConfig config, StatelessSession session, DatabaseDialect dialect, RecordWriter recordWriter) {
this.config = config; this.config = config;
this.tableNamingStrategy = config.getTableNamingStrategy(); this.tableNamingStrategy = config.getTableNamingStrategy();
this.dialect = dialect; this.dialect = dialect;

View File

@ -28,7 +28,7 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.connector.jdbc.dialect.DatabaseDialect; import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver; import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver;
import io.debezium.pipeline.sink.spi.ChangeEventSink; import io.debezium.pipeline.spi.ChangeEventSink;
import io.debezium.util.Stopwatch; import io.debezium.util.Stopwatch;
import io.debezium.util.Strings; import io.debezium.util.Strings;

View File

@ -26,14 +26,15 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.dlq.ErrorReporter; import io.debezium.dlq.ErrorReporter;
import io.debezium.pipeline.spi.ChangeEventSink;
final class StartedMongoDbSinkTask implements AutoCloseable { final class MongoDbChangeEventSink implements ChangeEventSink, AutoCloseable {
private final MongoDbSinkConnectorConfig sinkConfig; private final MongoDbSinkConnectorConfig sinkConfig;
private final MongoClient mongoClient; private final MongoClient mongoClient;
private final ErrorReporter errorReporter; private final ErrorReporter errorReporter;
StartedMongoDbSinkTask( MongoDbChangeEventSink(
final MongoDbSinkConnectorConfig sinkConfig, final MongoDbSinkConnectorConfig sinkConfig,
final MongoClient mongoClient, final MongoClient mongoClient,
final ErrorReporter errorReporter) { final ErrorReporter errorReporter) {
@ -51,7 +52,7 @@ public void close() {
} }
} }
void put(final Collection<SinkRecord> records) { public void execute(final Collection<SinkRecord> records) {
try { try {
trackLatestRecordTimestampOffset(records); trackLatestRecordTimestampOffset(records);
if (records.isEmpty()) { if (records.isEmpty()) {

View File

@ -28,7 +28,7 @@
public class MongoDbSinkConnectorTask extends SinkTask { public class MongoDbSinkConnectorTask extends SinkTask {
static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorTask.class); static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorTask.class);
private static final String CONNECTOR_TYPE = "sink"; private static final String CONNECTOR_TYPE = "sink";
private StartedMongoDbSinkTask startedTask; private MongoDbChangeEventSink mongoSink;
private MongoDbConnectionContext connectionContext; private MongoDbConnectionContext connectionContext;
@Override @Override
@ -51,7 +51,7 @@ public void start(final Map<String, String> props) {
try { try {
this.connectionContext = new MongoDbConnectionContext(config); this.connectionContext = new MongoDbConnectionContext(config);
client = this.connectionContext.getMongoClient(); client = this.connectionContext.getMongoClient();
startedTask = new StartedMongoDbSinkTask(sinkConfig, client, createErrorReporter()); mongoSink = new MongoDbChangeEventSink(sinkConfig, client, createErrorReporter());
} }
catch (RuntimeException taskStartingException) { catch (RuntimeException taskStartingException) {
// noinspection EmptyTryBlock // noinspection EmptyTryBlock
@ -81,7 +81,7 @@ public void start(final Map<String, String> props) {
*/ */
@Override @Override
public void put(final Collection<SinkRecord> records) { public void put(final Collection<SinkRecord> records) {
startedTask.put(records); mongoSink.execute(records);
} }
/** /**
@ -107,8 +107,8 @@ public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
@Override @Override
public void stop() { public void stop() {
LOGGER.info("Stopping MongoDB sink task"); LOGGER.info("Stopping MongoDB sink task");
if (startedTask != null) { if (mongoSink != null) {
startedTask.close(); mongoSink.close();
} }
} }
@ -135,6 +135,6 @@ private ErrorReporter createErrorReporter() {
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
static ErrorReporter nopErrorReporter() { static ErrorReporter nopErrorReporter() {
return (record, e) -> { return (record, e) -> {
}; /* do nothing */ };
} }
} }

View File

@ -3,7 +3,7 @@
* *
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/ */
package io.debezium.pipeline.sink.spi; package io.debezium.pipeline.spi;
import java.util.Collection; import java.util.Collection;