diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java index f22e12673..c2e48117e 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/AsyncEmbeddedEngine.java @@ -94,6 +94,7 @@ public final class AsyncEmbeddedEngine implements DebeziumEngine, AsyncEng private final AtomicReference state = new AtomicReference<>(State.CREATING); // state must be changed only via setEngineState() method private final List tasks = new ArrayList<>(); + private final List> pollingFutures = new ArrayList<>(); private final ExecutorService taskService; private final ExecutorService recordService; @@ -391,7 +392,7 @@ private void runTasksPolling(final List tasks) for (EngineSourceTask task : tasks) { final RecordProcessor processor = selectRecordProcessor(); processor.initialize(recordService, transformations, new SourceRecordCommitter(task)); - taskCompletionService.submit(new PollRecords(task, processor, state)); + pollingFutures.add(taskCompletionService.submit(new PollRecords(task, processor, state))); } for (int i = 0; i < tasks.size(); i++) { @@ -399,8 +400,7 @@ private void runTasksPolling(final List tasks) taskCompletionService.take().get(); } catch (InterruptedException e) { - LOGGER.debug("Task interrupted while polling."); - Thread.currentThread().interrupt(); + LOGGER.info("Task interrupted while polling."); } LOGGER.debug("Task #{} out of {} tasks has stopped polling.", i, tasks.size()); } @@ -461,6 +461,17 @@ private void stopRecordService() { } } + /** + * Stops task polling if they haven't stopped yet. Some tasks may be stuck in the polling, we should interrupt such tasks. + */ + private void stopPollingIfNeeded() { + for (Future pollingFuture : pollingFutures) { + if (!pollingFuture.isDone()) { + pollingFuture.cancel(true); + } + } + } + /** * Stops all the connector's tasks. There are no checks if the tasks were fully stated or already running, stop is always called. * Also tries to stop all the other tasks which may be still running or awaiting execution in the task's thread pool. @@ -526,6 +537,7 @@ private void stopConnector(final List tasks, final State engin if (State.STARING_TASKS.compareTo(engineState) <= 0) { LOGGER.debug("Tasks were already started, stopping record service and tasks."); stopRecordService(); + stopPollingIfNeeded(); stopSourceTasks(tasks); } LOGGER.debug("Stopping the connector.");