diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index acc4ae179..ea9bd20f2 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -30,7 +30,7 @@ import io.debezium.connector.jdbc.naming.TableNamingStrategy; import io.debezium.connector.jdbc.relational.TableDescriptor; import io.debezium.connector.jdbc.relational.TableId; -import io.debezium.pipeline.sink.spi.ChangeEventSink; +import io.debezium.pipeline.spi.ChangeEventSink; import io.debezium.util.Stopwatch; import io.debezium.util.Strings; @@ -52,7 +52,6 @@ public class JdbcChangeEventSink implements ChangeEventSink { private final RecordWriter recordWriter; public JdbcChangeEventSink(JdbcSinkConnectorConfig config, StatelessSession session, DatabaseDialect dialect, RecordWriter recordWriter) { - this.config = config; this.tableNamingStrategy = config.getTableNamingStrategy(); this.dialect = dialect; diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java index e93685413..8fad316a5 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorTask.java @@ -28,7 +28,7 @@ import io.debezium.DebeziumException; import io.debezium.connector.jdbc.dialect.DatabaseDialect; import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver; -import io.debezium.pipeline.sink.spi.ChangeEventSink; +import io.debezium.pipeline.spi.ChangeEventSink; import io.debezium.util.Stopwatch; import io.debezium.util.Strings; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbChangeEventSink.java similarity index 95% rename from debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java rename to debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbChangeEventSink.java index b035d258e..9cc95cf74 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/StartedMongoDbSinkTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbChangeEventSink.java @@ -26,14 +26,15 @@ import io.debezium.DebeziumException; import io.debezium.dlq.ErrorReporter; +import io.debezium.pipeline.spi.ChangeEventSink; -final class StartedMongoDbSinkTask implements AutoCloseable { +final class MongoDbChangeEventSink implements ChangeEventSink, AutoCloseable { private final MongoDbSinkConnectorConfig sinkConfig; private final MongoClient mongoClient; private final ErrorReporter errorReporter; - StartedMongoDbSinkTask( + MongoDbChangeEventSink( final MongoDbSinkConnectorConfig sinkConfig, final MongoClient mongoClient, final ErrorReporter errorReporter) { @@ -51,7 +52,7 @@ public void close() { } } - void put(final Collection records) { + public void execute(final Collection records) { try { trackLatestRecordTimestampOffset(records); if (records.isEmpty()) { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java index 50eeaa510..02c9990de 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/sink/MongoDbSinkConnectorTask.java @@ -28,7 +28,7 @@ public class MongoDbSinkConnectorTask extends SinkTask { static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSinkConnectorTask.class); private static final String CONNECTOR_TYPE = "sink"; - private StartedMongoDbSinkTask startedTask; + private MongoDbChangeEventSink mongoSink; private MongoDbConnectionContext connectionContext; @Override @@ -51,7 +51,7 @@ public void start(final Map props) { try { this.connectionContext = new MongoDbConnectionContext(config); client = this.connectionContext.getMongoClient(); - startedTask = new StartedMongoDbSinkTask(sinkConfig, client, createErrorReporter()); + mongoSink = new MongoDbChangeEventSink(sinkConfig, client, createErrorReporter()); } catch (RuntimeException taskStartingException) { // noinspection EmptyTryBlock @@ -81,7 +81,7 @@ public void start(final Map props) { */ @Override public void put(final Collection records) { - startedTask.put(records); + mongoSink.execute(records); } /** @@ -107,8 +107,8 @@ public void flush(final Map currentOffsets) { @Override public void stop() { LOGGER.info("Stopping MongoDB sink task"); - if (startedTask != null) { - startedTask.close(); + if (mongoSink != null) { + mongoSink.close(); } } @@ -135,6 +135,6 @@ private ErrorReporter createErrorReporter() { @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) static ErrorReporter nopErrorReporter() { return (record, e) -> { - }; + /* do nothing */ }; } } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/pipeline/sink/spi/ChangeEventSink.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeEventSink.java similarity index 94% rename from debezium-connector-jdbc/src/main/java/io/debezium/pipeline/sink/spi/ChangeEventSink.java rename to debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeEventSink.java index 9eef26612..bbf0ecc77 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/pipeline/sink/spi/ChangeEventSink.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeEventSink.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.pipeline.sink.spi; +package io.debezium.pipeline.spi; import java.util.Collection;