diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/RetryingCallable.java b/debezium-embedded/src/main/java/io/debezium/embedded/RetryingCallable.java new file mode 100644 index 000000000..791510fdc --- /dev/null +++ b/debezium-embedded/src/main/java/io/debezium/embedded/RetryingCallable.java @@ -0,0 +1,68 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.embedded; + +import java.util.concurrent.Callable; + +import org.apache.kafka.connect.errors.RetriableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.util.DelayStrategy; + +/** + * Extension to {@link Callable}, which allows to re-try the action if exception is thrown during the execution. + * The action is re-tried {@code retries} number of times. + * The delay between retries is defined by {@link DelayStrategy}, which needs to be provided by the implementing class. + * + * @author vjuranek + */ +public abstract class RetryingCallable implements Callable { + + private static final Logger LOGGER = LoggerFactory.getLogger(RetryingCallable.class); + + private final int retries; + + public RetryingCallable(final int retries) { + this.retries = retries; + } + + public abstract V doCall() throws Exception; + + public abstract DelayStrategy delayStrategy(); + + public V call() throws Exception { + final DelayStrategy delayStrategy = delayStrategy(); + // 0 retries means retries are disabled, + // -1 means infinite retries; int range is not infinite, but in this case probably a sufficient approximation. + // We start from retries-1 as the last call attempt is done out of the retry loop and this last call either + // succeeds or throws an exception which is propagated further. + int attempts = retries; + while (attempts != 0) { + try { + return doCall(); + } + catch (RetriableException e) { + attempts--; + String retriesExplained = retries == -1 ? "infinity" : String.valueOf(retries); + LOGGER.info("Failed with retriable exception, will retry later; attempt #{} out of {}", + retries - attempts, + retriesExplained, + e); + delayStrategy.sleepWhen(true); + // DelayStrategy catches interrupted exception during the sleep and just set back interrupted status. + // We need to re-throw the InterruptedException to avoid unwanted cycles in the retry loop, e.g. when + // executor service running this callable shuts down. Without re-throwing the exception it would + // result into cycling in the retry loop without any sleep in DelayStrategy until the running thread is + // killed by the executor service. + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Callable was interrupted while sleeping in DelayStrategy"); + } + } + } + return doCall(); + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/RetryingCallableTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/RetryingCallableTest.java new file mode 100644 index 000000000..bcd6cf970 --- /dev/null +++ b/debezium-embedded/src/test/java/io/debezium/embedded/RetryingCallableTest.java @@ -0,0 +1,184 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.embedded; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.kafka.connect.errors.RetriableException; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.junit.logging.LogInterceptor; +import io.debezium.util.DelayStrategy; +import io.debezium.util.LoggingContext; + +/** + * Tests for {@link java.util.concurrent.Callable} with retries {@link RetryingCallable}. + * + * @author vjuranek + */ +public class RetryingCallableTest { + + private ExecutorService execService; + + @Before + public void CreateExecutorService() { + execService = Executors.newSingleThreadExecutor(); + } + + @After + public void shutDownExecutorService() { + execService.shutdownNow(); + } + + @Test + public void shouldExecuteNeverFailing() throws InterruptedException, ExecutionException { + final LogInterceptor interceptor = new LogInterceptor(RetryingCallable.class); + Assertions.assertThat(execService.submit(new NeverFailing(0)).get()).isEqualTo(1); + assertThat(interceptor.containsMessage("Failed with retriable exception")).isFalse(); + } + + @Test + public void shouldNotRetryWhenCallableDoesNotFail() throws InterruptedException, ExecutionException { + final LogInterceptor interceptor = new LogInterceptor(RetryingCallable.class); + Assertions.assertThat(execService.submit(new NeverFailing(10)).get()).isEqualTo(1); + assertThat(interceptor.containsMessage("Failed with retriable exception")).isFalse(); + } + + @Test + public void shouldIgnoreInfiniteRetryWhenCallableDoesNotFail() throws InterruptedException, ExecutionException { + final LogInterceptor interceptor = new LogInterceptor(RetryingCallable.class); + Assertions.assertThat(execService.submit(new NeverFailing(EmbeddedEngineConfig.DEFAULT_ERROR_MAX_RETRIES)).get()).isEqualTo(1); + assertThat(interceptor.containsMessage("Failed with retriable exception")).isFalse(); + } + + @Test + public void shouldRetryAsManyTimesAsRequested() throws InterruptedException { + final LogInterceptor interceptor = new LogInterceptor(RetryingCallable.class); + LoggingContext.forConnector(getClass().getSimpleName(), "", "callable"); + + TwoTimesFailing failing = new TwoTimesFailing(10); + try { + execService.submit(failing).get(); + } + catch (ExecutionException e) { + assertThat(e.getCause() instanceof RetriableException).isTrue(); + } + + // Callable should fail 2 times and 3rh time it should succeed. + assertThat(failing.calls).isEqualTo(3); + assertThat(interceptor.countOccurrences("Failed with retriable exception")).isEqualTo(2); + } + + @Test + public void shouldRetryAsManyTimesAsRequestedWhenAlwaysFails() throws InterruptedException { + final LogInterceptor interceptor = new LogInterceptor(RetryingCallable.class); + LoggingContext.forConnector(getClass().getSimpleName(), "", "callable"); + + AlwaysFailing failing = new AlwaysFailing(5); + try { + execService.submit(failing).get(); + } + catch (ExecutionException e) { + assertThat(e.getCause() instanceof RetriableException).isTrue(); + } + + // Should be called 6 times - 1 call + 5 retries. + assertThat(failing.calls).isEqualTo(6); + // But we should see only 5 exception as the call was retried 5 times and on the 6th call failed, which is + // not logged but thrown up to the stack. + assertThat(interceptor.countOccurrences("Failed with retriable exception")).isEqualTo(5); + } + + @Test + public void shouldNotRetryWhenRetriesAreDisabled() throws InterruptedException { + final LogInterceptor interceptor = new LogInterceptor(RetryingCallable.class); + LoggingContext.forConnector(getClass().getSimpleName(), "", "callable"); + + // 0 means that retries are disabled. + AlwaysFailing failing = new AlwaysFailing(0); + try { + execService.submit(failing).get(); + } + catch (ExecutionException e) { + assertThat(e.getCause() instanceof RetriableException).isTrue(); + } + + // Should be called only 1 time. + assertThat(failing.calls).isEqualTo(1); + // And there shouldn't be any call in retry loop. + assertThat(interceptor.containsMessage("Failed with retriable exception")).isFalse(); + } + + @Test + public void shouldKeepRetryingWhenRetryIsInfinite() throws InterruptedException { + final LogInterceptor interceptor = new LogInterceptor(RetryingCallable.class); + LoggingContext.forConnector(getClass().getSimpleName(), "", "callable"); + + // -1 means that retries are disabled. + // Should fail if we change the config defaults, in such case loop in RetryingCallable needs to be adjusted! + AlwaysFailing failing = new AlwaysFailing(EmbeddedEngineConfig.DEFAULT_ERROR_MAX_RETRIES); + execService.submit(failing); + Thread.sleep(3000); + execService.shutdown(); + + // Wait between the calls is 100 ms, so we should have at least 5 calls during 3 seconds sleep. + assertThat(failing.calls).isGreaterThan(5); + assertThat(interceptor.countOccurrences("Failed with retriable exception")).isGreaterThan(5); + } + + private static class NeverFailing extends RetryingCallable { + + protected volatile int calls; + + NeverFailing(final int retries) { + super(retries); + this.calls = 0; + } + + public Integer doCall() throws Exception { + calls++; + return Integer.valueOf(calls); + } + + @Override + public DelayStrategy delayStrategy() { + return DelayStrategy.linear(Duration.ofMillis(100)); + } + } + + private static class AlwaysFailing extends NeverFailing { + AlwaysFailing(final int retries) { + super(retries); + } + + public Integer doCall() throws Exception { + super.doCall(); + throw new RetriableException("Good try, but I always fail"); + } + } + + private static class TwoTimesFailing extends NeverFailing { + TwoTimesFailing(final int retries) { + super(retries); + } + + public Integer doCall() throws Exception { + super.doCall(); + if (calls <= 2) { + throw new RetriableException(String.format("Good try, but I fail this time (call #%s)", calls)); + } + return calls; + } + } +}