From bfb6938db8c44bae04d85c0326021b9e8eda929f Mon Sep 17 00:00:00 2001 From: rkerner Date: Wed, 4 Sep 2024 18:35:04 +0200 Subject: [PATCH] DBZ-7996 Move ChangeEventSink class to debezium-core and use the interface for the MongoDB sink connector relates to https://issues.redhat.com/browse/DBZ-7996 --- .../debezium/connector/jdbc/JdbcChangeEventSink.java | 3 +-- .../connector/jdbc/JdbcSinkConnectorTask.java | 2 +- ...goDbSinkTask.java => MongoDbChangeEventSink.java} | 7 ++++--- .../mongodb/sink/MongoDbSinkConnectorTask.java | 12 ++++++------ .../io/debezium/pipeline}/spi/ChangeEventSink.java | 2 +- 5 files changed, 13 insertions(+), 13 deletions(-) rename debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/{StartedMongoDbSinkTask.java => MongoDbChangeEventSink.java} (95%) rename {debezium-connector-jdbc/src/main/java/io/debezium/pipeline/sink => debezium-core/src/main/java/io/debezium/pipeline}/spi/ChangeEventSink.java (94%) diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index acc4ae179..ea9bd20f2 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -30,7 +30,7 @@ import io.debezium.connector.jdbc.naming.TableNamingStrategy; import io.debezium.connector.jdbc.relational.TableDescriptor; import io.debezium.connector.jdbc.relational.TableId; -import io.debezium.pipeline.sink.spi.ChangeEventSink; +import io.debezium.pipeline.spi.ChangeEventSink; import io.debezium.util.Stopwatch; import io.debezium.util.Strings; @@ -52,7 +52,6 @@ public class JdbcChangeEventSink implements ChangeEventSink { private final RecordWriter recordWriter; public JdbcChangeEventSink(JdbcSinkConnectorConfig config, StatelessSession session, DatabaseDialect dialect, RecordWriter recordWriter) { - this.config = config; this.tableNamingStrategy = config.getTableNamingStrategy(); this.dialect = dialect; diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java index e93685413..8fad316a5 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java @@ -28,7 +28,7 @@ import io.debezium.DebeziumException; import io.debezium.connector.jdbc.dialect.DatabaseDialect; import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver; -import io.debezium.pipeline.sink.spi.ChangeEventSink; +import io.debezium.pipeline.spi.ChangeEventSink; import io.debezium.util.Stopwatch; import io.debezium.util.Strings; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbChangeEventSink.java similarity index 95% rename from debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java rename to debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbChangeEventSink.java index b035d258e..9cc95cf74 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbChangeEventSink.java @@ -26,14 +26,15 @@ import io.debezium.DebeziumException; import io.debezium.dlq.ErrorReporter; +import io.debezium.pipeline.spi.ChangeEventSink; -final class StartedMongoDbSinkTask implements AutoCloseable { +final class MongoDbChangeEventSink implements ChangeEventSink, AutoCloseable { private final MongoDbSinkConnectorConfig sinkConfig; private final MongoClient mongoClient; private final ErrorReporter errorReporter; - StartedMongoDbSinkTask( + MongoDbChangeEventSink( final MongoDbSinkConnectorConfig sinkConfig, final MongoClient mongoClient, final ErrorReporter errorReporter) { @@ -51,7 +52,7 @@ public void close() { } } - void put(final Collection records) { + public void execute(final Collection records) { try { trackLatestRecordTimestampOffset(records); if (records.isEmpty()) { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java index 50eeaa510..02c9990de 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java @@ -28,7 +28,7 @@ public class MongoDbSinkConnectorTask extends SinkTask { static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorTask.class); private static final String CONNECTOR_TYPE = "sink"; - private StartedMongoDbSinkTask startedTask; + private MongoDbChangeEventSink mongoSink; private MongoDbConnectionContext connectionContext; @Override @@ -51,7 +51,7 @@ public void start(final Map props) { try { this.connectionContext = new MongoDbConnectionContext(config); client = this.connectionContext.getMongoClient(); - startedTask = new StartedMongoDbSinkTask(sinkConfig, client, createErrorReporter()); + mongoSink = new MongoDbChangeEventSink(sinkConfig, client, createErrorReporter()); } catch (RuntimeException taskStartingException) { // noinspection EmptyTryBlock @@ -81,7 +81,7 @@ public void start(final Map props) { */ @Override public void put(final Collection records) { - startedTask.put(records); + mongoSink.execute(records); } /** @@ -107,8 +107,8 @@ public void flush(final Map currentOffsets) { @Override public void stop() { LOGGER.info("Stopping MongoDB sink task"); - if (startedTask != null) { - startedTask.close(); + if (mongoSink != null) { + mongoSink.close(); } } @@ -135,6 +135,6 @@ private ErrorReporter createErrorReporter() { @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) static ErrorReporter nopErrorReporter() { return (record, e) -> { - }; + /* do nothing */ }; } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/pipeline/sink/spi/ChangeEventSink.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeEventSink.java similarity index 94% rename from debezium-connector-jdbc/src/main/java/io/debezium/pipeline/sink/spi/ChangeEventSink.java rename to debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeEventSink.java index 9eef26612..bbf0ecc77 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/pipeline/sink/spi/ChangeEventSink.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeEventSink.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.pipeline.sink.spi; +package io.debezium.pipeline.spi; import java.util.Collection;