DBZ-5879 Added logging; removed commented out code

This commit is contained in:
Jiri Pechanec 2023-01-18 14:06:13 +01:00
parent 553cb9b1dd
commit 1ec41fa120
3 changed files with 36 additions and 30 deletions

View File

@ -111,7 +111,7 @@ public final void start(Map<String, String> props) {
stateLock.lock();
try {
state.set(State.INITIAL);
setTaskState(State.INITIAL);
config = Configuration.from(props);
retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
// need to reset the delay or you only get one delayed restart
@ -223,7 +223,7 @@ private boolean startIfNeededAndPossible() {
boolean result;
try {
State currentState = state.get();
State currentState = getTaskState();
if (currentState == State.RUNNING) {
result = true;
}
@ -254,7 +254,7 @@ else if (currentState == State.INITIAL) {
if (currentState != State.RUNNING && result) {
// we successfully started, clear restart state
restartDelay = null;
state.set(State.RUNNING);
setTaskState(State.RUNNING);
}
}
finally {
@ -272,10 +272,6 @@ private void stop(boolean restart) {
stateLock.lock();
try {
// if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
// LOGGER.info("Connector is already stopped.");
// }
if (restart) {
LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", retriableRestartWait.getSeconds());
}
@ -298,14 +294,14 @@ private void stop(boolean restart) {
doStop();
if (restart) {
state.set(State.RESTARTING);
setTaskState(State.RESTARTING);
if (restartDelay == null) {
restartDelay = ElapsedTimeStrategy.constant(Clock.system(), retriableRestartWait.toMillis());
restartDelay.hasElapsed();
}
}
else {
state.set(State.STOPPED);
setTaskState(State.STOPPED);
}
}
finally {
@ -380,8 +376,18 @@ protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, Offse
return Offsets.of(offsets);
}
/**
* Sets the new state for the task. The caller must be holding {@link #stateLock} lock.
*
* @param newState
*/
private void setTaskState(State newState) {
State oldState = state.getAndSet(newState);
LOGGER.debug("Setting task state to '{}', previous state was '{}'", newState, oldState);
}
@VisibleForTesting
public State getState() {
public State getTaskState() {
stateLock.lock();
try {
return state.get();

View File

@ -42,11 +42,11 @@ public void setup() {
public void verifyTaskStartsAndStops() throws InterruptedException {
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getTaskState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getTaskState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getTaskState());
assertEquals(1, baseSourceTask.startCount.get());
assertEquals(1, baseSourceTask.stopCount.get());
@ -57,9 +57,9 @@ public void verifyTaskStartsAndStops() throws InterruptedException {
public void verifyStartAndStopWithoutPolling() {
baseSourceTask.initialize(mock(SourceTaskContext.class));
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getTaskState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getTaskState());
assertEquals(0, baseSourceTask.startCount.get());
assertEquals(1, baseSourceTask.stopCount.get());
@ -69,17 +69,17 @@ public void verifyStartAndStopWithoutPolling() {
public void verifyTaskCanBeStartedAfterStopped() throws InterruptedException {
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getTaskState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getTaskState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getTaskState());
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getTaskState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getTaskState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getTaskState());
assertEquals(2, baseSourceTask.startCount.get());
assertEquals(2, baseSourceTask.stopCount.get());
@ -105,17 +105,17 @@ protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configura
CommonConnectorConfig.RETRIABLE_RESTART_WAIT.name(), "1" // wait 1ms between restarts
);
baseSourceTask.start(config);
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getTaskState());
pollAndIgnoreRetryException(baseSourceTask);
assertEquals(BaseSourceTask.State.RESTARTING, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.RESTARTING, baseSourceTask.getTaskState());
sleep(100); // wait 10ms in order to satisfy retriable wait
pollAndIgnoreRetryException(baseSourceTask);
assertEquals(BaseSourceTask.State.RESTARTING, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.RESTARTING, baseSourceTask.getTaskState());
sleep(100); // wait 10ms in order to satisfy retriable wait
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getTaskState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getTaskState());
assertEquals(3, baseSourceTask.startCount.get());
assertEquals(3, baseSourceTask.stopCount.get());
@ -125,11 +125,11 @@ protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configura
@Test
public void verifyOutOfOrderPollDoesNotStartTask() throws InterruptedException {
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getTaskState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getTaskState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getTaskState());
assertEquals(0, baseSourceTask.startCount.get());
assertEquals(1, baseSourceTask.stopCount.get());

View File

@ -438,7 +438,7 @@ protected void waitForNotInitialState() {
.alias("Task has attempted to initialize coordinator")
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(waitTimeForRecords() * 30L, TimeUnit.SECONDS)
.until(() -> baseSourceTask.getState() != BaseSourceTask.State.INITIAL);
.until(() -> baseSourceTask.getTaskState() != BaseSourceTask.State.INITIAL);
}
});