DBZ-5879 Add wait to tests in order to maintain expectations of existing integration tests

This commit is contained in:
Jeremy Ford 2023-01-15 18:25:58 -05:00 committed by Jiri Pechanec
parent 8da86b78e8
commit 553cb9b1dd
2 changed files with 27 additions and 2 deletions

View File

@ -53,7 +53,7 @@ public abstract class BaseSourceTask<P extends Partition, O extends OffsetContex
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
private Configuration config;
protected enum State {
public enum State {
RESTARTING,
RUNNING,
INITIAL,
@ -381,7 +381,7 @@ protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, Offse
}
@VisibleForTesting
State getState() {
public State getState() {
stateLock.lock();
try {
return state.get();

View File

@ -65,6 +65,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Instantiator;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine.CompletionCallback;
import io.debezium.embedded.EmbeddedEngine.ConnectorCallback;
@ -411,6 +412,10 @@ public void taskStarted() {
// maybe it takes more time to start up, so just log a warning and continue
logger.warn("The connector did not finish starting its task(s) or complete in the expected amount of time");
}
// This allows existing tests to work without modification since they typically assume the
// BaseSourceTask#start(Configuration) method has been execute as part of the Task's start method.
waitForNotInitialState();
}
catch (InterruptedException e) {
if (Thread.interrupted()) {
@ -419,6 +424,26 @@ public void taskStarted() {
}
}
/**
* Wait until the Task state it not {@link BaseSourceTask.State#INITIAL}.
* This indicates that the task has been polled and the internal tasks startIfNecessary method has been called.
* <p/>
* This methos will return immediately if the task is not an instance of {@link BaseSourceTask}.
*/
protected void waitForNotInitialState() {
engine.runWithTask(sourceTask -> {
if (sourceTask instanceof BaseSourceTask) {
BaseSourceTask<?, ?> baseSourceTask = (BaseSourceTask<?, ?>) sourceTask;
Awaitility.await()
.alias("Task has attempted to initialize coordinator")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS)
.until(() -> baseSourceTask.getState() != BaseSourceTask.State.INITIAL);
}
});
}
protected Consumer<SourceRecord> getConsumer(Predicate<SourceRecord> isStopRecord, Consumer<SourceRecord> recordArrivedListener, boolean ignoreRecordsAfterStop) {
return (record) -> {
if (isStopRecord != null && isStopRecord.test(record)) {