DBZ-7024 Interrupt polling if needed

Some polling tasks may be stuck and we need to interrupt polling during
the shutdown not have to wait for TASK_MANAGEMENT_TIMEOUT_MS to timeout.

Also, when we start to interrput polling, we have to remove interruption
of the main thread in the `catch` part. It was a bug anyway as it
interrputed the main thread what we definitelly don't want to happen in
any case.
This commit is contained in:
Vojtech Juranek 2024-02-07 14:30:17 +01:00 committed by Jiri Pechanec
parent 40131c0531
commit e2d2cff7fd

View File

@ -94,6 +94,7 @@ public final class AsyncEmbeddedEngine<R> implements DebeziumEngine<R>, AsyncEng
private final AtomicReference<State> state = new AtomicReference<>(State.CREATING); // state must be changed only via setEngineState() method
private final List<EngineSourceTask> tasks = new ArrayList<>();
private final List<Future<Void>> pollingFutures = new ArrayList<>();
private final ExecutorService taskService;
private final ExecutorService recordService;
@ -391,7 +392,7 @@ private void runTasksPolling(final List<EngineSourceTask> tasks)
for (EngineSourceTask task : tasks) {
final RecordProcessor processor = selectRecordProcessor();
processor.initialize(recordService, transformations, new SourceRecordCommitter(task));
taskCompletionService.submit(new PollRecords(task, processor, state));
pollingFutures.add(taskCompletionService.submit(new PollRecords(task, processor, state)));
}
for (int i = 0; i < tasks.size(); i++) {
@ -399,8 +400,7 @@ private void runTasksPolling(final List<EngineSourceTask> tasks)
taskCompletionService.take().get();
}
catch (InterruptedException e) {
LOGGER.debug("Task interrupted while polling.");
Thread.currentThread().interrupt();
LOGGER.info("Task interrupted while polling.");
}
LOGGER.debug("Task #{} out of {} tasks has stopped polling.", i, tasks.size());
}
@ -461,6 +461,17 @@ private void stopRecordService() {
}
}
/**
* Stops task polling if they haven't stopped yet. Some tasks may be stuck in the polling, we should interrupt such tasks.
*/
private void stopPollingIfNeeded() {
for (Future<Void> pollingFuture : pollingFutures) {
if (!pollingFuture.isDone()) {
pollingFuture.cancel(true);
}
}
}
/**
* Stops all the connector's tasks. There are no checks if the tasks were fully stated or already running, stop is always called.
* Also tries to stop all the other tasks which may be still running or awaiting execution in the task's thread pool.
@ -526,6 +537,7 @@ private void stopConnector(final List<EngineSourceTask> tasks, final State engin
if (State.STARING_TASKS.compareTo(engineState) <= 0) {
LOGGER.debug("Tasks were already started, stopping record service and tasks.");
stopRecordService();
stopPollingIfNeeded();
stopSourceTasks(tasks);
}
LOGGER.debug("Stopping the connector.");