DBZ-7110 Remove deprecated EmbeddedEngine interface

This commit is contained in:
Vojtech Juranek 2023-11-06 13:31:11 +01:00 committed by Jiri Pechanec
parent 68481038b3
commit c34cf6920c
5 changed files with 22 additions and 132 deletions

View File

@ -79,7 +79,7 @@ public class ConvertingEngineBuilder<R> implements Builder<R> {
}
ConvertingEngineBuilder(KeyValueHeaderChangeEventFormat<?, ?, ?> format) {
this.delegate = EmbeddedEngine.create();
this.delegate = new EmbeddedEngine.BuilderImpl();
this.formatKey = format.getKeyFormat();
this.formatValue = format.getValueFormat();
this.formatHeader = format.getHeaderFormat();

View File

@ -84,7 +84,7 @@
@ThreadSafe
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>, EmbeddedEngineConfig {
public static final class BuilderImpl implements Builder {
public static final class BuilderImpl implements Builder<SourceRecord> {
private Configuration config;
private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
private ClassLoader classLoader;
@ -93,12 +93,6 @@ public static final class BuilderImpl implements Builder {
private DebeziumEngine.ConnectorCallback connectorCallback;
private OffsetCommitPolicy offsetCommitPolicy = null;
@Override
public Builder using(Configuration config) {
this.config = config;
return this;
}
@Override
public Builder using(Properties config) {
this.config = Configuration.from(config);
@ -111,12 +105,6 @@ public Builder using(ClassLoader classLoader) {
return this;
}
@Override
public Builder using(Clock clock) {
this.clock = clock;
return this;
}
@Override
public Builder using(DebeziumEngine.CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
@ -153,13 +141,14 @@ public Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler) {
@Override
public Builder using(java.time.Clock clock) {
return using(new Clock() {
this.clock = new Clock() {
@Override
public long currentTimeInMillis() {
return clock.millis();
}
});
};
return this;
}
@Override
@ -175,31 +164,6 @@ public EmbeddedEngine build() {
return new EmbeddedEngine(config, classLoader, clock,
handler, completionCallback, connectorCallback, offsetCommitPolicy);
}
// backward compatibility methods
@Override
public Builder using(CompletionCallback completionCallback) {
return using((DebeziumEngine.CompletionCallback) completionCallback);
}
@Override
public Builder using(ConnectorCallback connectorCallback) {
return using((DebeziumEngine.ConnectorCallback) connectorCallback);
}
}
/**
* A callback function to be notified when the connector completes.
*/
@Deprecated
public interface CompletionCallback extends DebeziumEngine.CompletionCallback {
}
/**
* Callback function which informs users about the various stages a connector goes through during startup
*/
@Deprecated
public interface ConnectorCallback extends DebeziumEngine.ConnectorCallback {
}
/**
@ -308,25 +272,8 @@ public boolean hasError() {
}
}
/**
* Contract passed to {@link ChangeConsumer}s, allowing them to commit single records as they have been processed
* and to signal that offsets may be flushed eventually.
*/
@ThreadSafe
@Deprecated
public interface RecordCommitter extends DebeziumEngine.RecordCommitter<SourceRecord> {
}
/**
* A contract invoked by the embedded engine when it has received a batch of change records to be processed. Allows
* to process multiple records in one go, acknowledging their processing once that's done.
*/
@Deprecated
public interface ChangeConsumer extends DebeziumEngine.ChangeConsumer<SourceRecord> {
}
private static ChangeConsumer buildDefaultChangeConsumer(Consumer<SourceRecord> consumer) {
return new ChangeConsumer() {
private static ChangeConsumer<SourceRecord> buildDefaultChangeConsumer(Consumer<SourceRecord> consumer) {
return new DebeziumEngine.ChangeConsumer<SourceRecord>() {
/**
* the default implementation that is compatible with the old Consumer api.
@ -358,60 +305,6 @@ public void handleBatch(List<SourceRecord> records, DebeziumEngine.RecordCommitt
};
}
/**
* A builder to set up and create {@link EmbeddedEngine} instances.
*/
@Deprecated
public interface Builder extends DebeziumEngine.Builder<SourceRecord> {
/**
* Use the specified configuration for the connector. The configuration is assumed to already be valid.
*
* @param config the configuration
* @return this builder object so methods can be chained together; never null
*/
Builder using(Configuration config);
/**
* Use the specified clock when needing to determine the current time. Passing <code>null</code> or not calling this
* method results in the connector using the {@link Clock#system() system clock}.
*
* @param clock the clock
* @return this builder object so methods can be chained together; never null
*/
Builder using(Clock clock);
// backward compatibility methods
@Override
Builder notifying(Consumer<SourceRecord> consumer);
@Override
Builder notifying(DebeziumEngine.ChangeConsumer<SourceRecord> handler);
@Override
Builder using(ClassLoader classLoader);
Builder using(CompletionCallback completionCallback);
Builder using(ConnectorCallback connectorCallback);
@Override
Builder using(OffsetCommitPolicy policy);
@Override
EmbeddedEngine build();
}
/**
* Obtain a new {@link Builder} instance that can be used to construct runnable {@link EmbeddedEngine} instances.
*
* @return the new builder; never null
*/
@Deprecated
public static Builder create() {
return new BuilderImpl();
}
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedEngine.class);
private final Configuration config;
@ -920,7 +813,7 @@ private int getErrorsMaxRetries() {
* @return the new recordCommitter to be used for a given batch
*/
protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) {
return new RecordCommitter() {
return new DebeziumEngine.RecordCommitter<SourceRecord>() {
@Override
public synchronized void markProcessed(SourceRecord record) throws InterruptedException {

View File

@ -69,8 +69,6 @@
import io.debezium.config.Configuration;
import io.debezium.config.Instantiator;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine.CompletionCallback;
import io.debezium.embedded.EmbeddedEngine.ConnectorCallback;
import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig;
import io.debezium.engine.DebeziumEngine;
import io.debezium.function.BooleanConsumer;
@ -219,12 +217,12 @@ protected int getMaximumEnqueuedRecordCount() {
}
/**
* Create a {@link CompletionCallback} that logs when the engine fails to start the connector or when the connector
* Create a {@link DebeziumEngine.CompletionCallback} that logs when the engine fails to start the connector or when the connector
* stops running after completing successfully or due to an error
*
* @return the logging {@link CompletionCallback}
* @return the logging {@link DebeziumEngine.CompletionCallback}
*/
protected CompletionCallback loggingCompletion() {
protected DebeziumEngine.CompletionCallback loggingCompletion() {
return (success, msg, error) -> {
if (success) {
logger.info(msg);
@ -370,7 +368,7 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
.with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0)
.build();
latch = new CountDownLatch(1);
CompletionCallback wrapperCallback = (success, msg, error) -> {
DebeziumEngine.CompletionCallback wrapperCallback = (success, msg, error) -> {
try {
if (callback != null) {
callback.handle(success, msg, error);
@ -385,7 +383,7 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
Testing.debug("Stopped connector");
};
ConnectorCallback connectorCallback = new ConnectorCallback() {
DebeziumEngine.ConnectorCallback connectorCallback = new DebeziumEngine.ConnectorCallback() {
@Override
public void taskStarted() {
// if this is called, it means a task has been started successfully so we can continue
@ -406,8 +404,8 @@ public void connectorStopped() {
};
// Create the connector ...
EmbeddedEngine.Builder builder = EmbeddedEngine.create();
builder.using(config)
EmbeddedEngine.Builder builder = new EmbeddedEngine.BuilderImpl();
builder.using(config.asProperties())
.notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop))
.using(this.getClass().getClassLoader())
.using(wrapperCallback)
@ -415,7 +413,7 @@ public void connectorStopped() {
if (changeConsumer != null) {
builder.notifying(changeConsumer);
}
engine = new TestingEmbeddedEngine(builder.build());
engine = new TestingEmbeddedEngine((EmbeddedEngine) builder.build());
// Submit the connector for asynchronous execution ...
assertThat(executor).isNull();
@ -626,7 +624,7 @@ protected SourceRecords consumeRecordsByTopic(int numRecords) throws Interrupted
* This is most useful in corner cases when there can be a duplicate records between snapshot
* and streaming switch.
*
* @param numRecords the number of records that should be consumed
* @param recordsToRead the number of records that should be consumed
* @param tripCondition condition to satisfy to stop skipping records
* @return the collector into which the records were captured; never null
* @throws InterruptedException if the thread was interrupted while waiting for a record to be returned

View File

@ -60,7 +60,6 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.Value;
import io.debezium.embedded.EmbeddedEngine.CompletionCallback;
import io.debezium.embedded.EmbeddedEngine.CompletionResult;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
@ -796,7 +795,7 @@ protected void runConnector(TestSpecification spec) {
* @param spec the test specification
* @param callback the function that should be called when the connector is stopped
*/
protected void runConnector(TestSpecification spec, CompletionCallback callback) {
protected void runConnector(TestSpecification spec, DebeziumEngine.CompletionCallback callback) {
PreviousContext preRunContext = LoggingContext.forConnector(getClass().getSimpleName(), "runner", spec.name());
final Configuration environmentConfig = Configuration.copy(spec.environment()).build();
final Configuration connectorConfig = spec.config();

View File

@ -126,18 +126,18 @@ public void doSetup() {
.build();
Consumer<SourceRecord> recordArrivedListener = this::processRecord;
this.engine = EmbeddedEngine.create()
.using(config)
this.engine = (EmbeddedEngine) new EmbeddedEngine.BuilderImpl()
.using(config.asProperties())
.notifying((record) -> {
if (!engine.isRunning() || Thread.currentThread().isInterrupted()) {
return;
}
while (!consumedLines.offer(record)) {
while (!consumedLines.offer((SourceRecord) record)) {
if (!engine.isRunning() || Thread.currentThread().isInterrupted()) {
return;
}
}
recordArrivedListener.accept(record);
recordArrivedListener.accept((SourceRecord) record);
})
.using(this.getClass().getClassLoader())
.build();