DBZ-5879 Support retrying database connection failures during connector start

Reworked when task requests start from subclasses
Add support for restarting
This commit is contained in:
Jeremy Ford 2023-01-11 21:34:43 -05:00 committed by Jiri Pechanec
parent e8729bb0eb
commit f2a6f0b8db
4 changed files with 279 additions and 42 deletions

View File

@ -1691,7 +1691,7 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exc
@Test
@FixFor("DBZ-1437")
public void shouldPeformSnapshotOnceForInitialOnlySnapshotMode() throws Exception {
public void shouldPerformSnapshotOnceForInitialOnlySnapshotMode() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor(InitialOnlySnapshotter.class);

View File

@ -71,6 +71,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
@ -93,6 +98,12 @@
<artifactId>kafka_${version.kafka.scala}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<!-- <version>${version.kafka}</version>-->
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>

View File

@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.annotation.SingleThreadAccess;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
@ -50,13 +51,16 @@ public abstract class BaseSourceTask<P extends Partition, O extends OffsetContex
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
private static final Duration INITIAL_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.SECONDS.toMillis(5));
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
private Configuration config;
protected enum State {
RESTARTING,
RUNNING,
STOPPED;
INITIAL,
STOPPED
}
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
private final AtomicReference<State> state = new AtomicReference<>(State.INITIAL);
/**
* Used to ensure that start(), stop() and commitRecord() calls are serialized.
@ -65,11 +69,6 @@ protected enum State {
private volatile ElapsedTimeStrategy restartDelay;
/**
* Raw connector properties, kept here so they can be passed again in case of a restart.
*/
private volatile Map<String, String> props;
/**
* The change event source coordinator for those connectors adhering to the new
* framework structure, {@code null} for legacy-style connectors.
@ -112,13 +111,8 @@ public final void start(Map<String, String> props) {
stateLock.lock();
try {
if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
LOGGER.info("Connector has already been started");
return;
}
this.props = props;
Configuration config = Configuration.from(props);
state.set(State.INITIAL);
config = Configuration.from(props);
retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
// need to reset the delay or you only get one delayed restart
restartDelay = null;
@ -132,8 +126,6 @@ public final void start(Map<String, String> props) {
LOGGER.info(" {} = {}", propName, propValue);
});
}
this.coordinator = start(config);
}
finally {
stateLock.unlock();
@ -145,7 +137,8 @@ protected Configuration withMaskedSensitiveOptions(Configuration config) {
}
/**
* Called once when starting this source task.
* Called when starting this source task. This method can throw a {@link RetriableException} to indicate
* that the task should attempt to retry the start later.
*
* @param config
* the task configuration; implementations should wrap it in a dedicated implementation of
@ -155,18 +148,19 @@ protected Configuration withMaskedSensitiveOptions(Configuration config) {
@Override
public final List<SourceRecord> poll() throws InterruptedException {
boolean started = startIfNeededAndPossible();
// in backoff period after a retriable exception
if (!started) {
// WorkerSourceTask calls us immediately after we return the empty list.
// This turns into a throttling so we need to make a pause before we return
// the control back.
Metronome.parker(Duration.of(2, ChronoUnit.SECONDS), Clock.SYSTEM).pause();
return Collections.emptyList();
}
try {
boolean started = startIfNeededAndPossible();
// in backoff period after a retriable exception
if (!started) {
// WorkerSourceTask calls us immediately after we return the empty list.
// This turns into a throttling so we need to make a pause before we return
// the control back.
Metronome.parker(Duration.of(2, ChronoUnit.SECONDS), Clock.SYSTEM).pause();
return Collections.emptyList();
}
final List<SourceRecord> records = doPoll();
logStatistics(records);
return records;
@ -227,22 +221,46 @@ private void updateLastOffset(Map<String, ?> partition, Map<String, ?> lastOffse
private boolean startIfNeededAndPossible() {
stateLock.lock();
boolean result;
try {
if (state.get() == State.RUNNING) {
return true;
State currentState = state.get();
if (currentState == State.RUNNING) {
result = true;
}
else if (restartDelay != null && restartDelay.hasElapsed()) {
start(props);
return true;
else if (currentState == State.RESTARTING) {
// we're in restart mode... check if it's time to restart
if (restartDelay.hasElapsed()) {
LOGGER.info("Attempting to restart task.");
this.coordinator = start(config);
LOGGER.info("Successfully restarted task");
result = true;
}
else {
LOGGER.info("Awaiting end of restart backoff period after a retriable error");
result = false;
}
}
else if (currentState == State.INITIAL) {
LOGGER.info("Attempting to start task");
this.coordinator = start(config);
LOGGER.info("Successfully started task");
result = true;
}
else {
LOGGER.info("Awaiting end of restart backoff period after a retriable error");
return false;
LOGGER.warn("Attempting to start task but task has been stopped.");
result = false;
}
if (currentState != State.RUNNING && result) {
// we successfully started, clear restart state
restartDelay = null;
state.set(State.RUNNING);
}
}
finally {
stateLock.unlock();
}
return result;
}
@Override
@ -254,10 +272,9 @@ private void stop(boolean restart) {
stateLock.lock();
try {
if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOGGER.info("Connector has already been stopped");
return;
}
// if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
// LOGGER.info("Connector is already stopped.");
// }
if (restart) {
LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", retriableRestartWait.getSeconds());
@ -269,6 +286,7 @@ private void stop(boolean restart) {
try {
if (coordinator != null) {
coordinator.stop();
coordinator = null;
}
}
catch (InterruptedException e) {
@ -279,9 +297,15 @@ private void stop(boolean restart) {
doStop();
if (restart && restartDelay == null) {
restartDelay = ElapsedTimeStrategy.constant(Clock.system(), retriableRestartWait.toMillis());
restartDelay.hasElapsed();
if (restart) {
state.set(State.RESTARTING);
if (restartDelay == null) {
restartDelay = ElapsedTimeStrategy.constant(Clock.system(), retriableRestartWait.toMillis());
restartDelay.hasElapsed();
}
}
else {
state.set(State.STOPPED);
}
}
finally {
@ -355,4 +379,15 @@ protected Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, Offse
return Offsets.of(offsets);
}
@VisibleForTesting
State getState() {
stateLock.lock();
try {
return state.get();
}
finally {
stateLock.unlock();
}
}
}

View File

@ -0,0 +1,191 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.common;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
public class BaseSourceTaskTest {
private final MyBaseSourceTask baseSourceTask = new MyBaseSourceTask();
@Before
public void setup() {
baseSourceTask.initialize(mock(SourceTaskContext.class));
}
@Test
public void verifyTaskStartsAndStops() throws InterruptedException {
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(1, baseSourceTask.startCount.get());
assertEquals(1, baseSourceTask.stopCount.get());
verify(baseSourceTask.coordinator).stop();
}
@Test
public void verifyStartAndStopWithoutPolling() {
baseSourceTask.initialize(mock(SourceTaskContext.class));
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(0, baseSourceTask.startCount.get());
assertEquals(1, baseSourceTask.stopCount.get());
}
@Test
public void verifyTaskCanBeStartedAfterStopped() throws InterruptedException {
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(2, baseSourceTask.startCount.get());
assertEquals(2, baseSourceTask.stopCount.get());
verify(baseSourceTask.coordinator, times(2)).stop();
}
@Test
public void verifyTaskRestartsSuccessfully() throws InterruptedException {
MyBaseSourceTask baseSourceTask = new MyBaseSourceTask() {
@Override
protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configuration config) {
ChangeEventSourceCoordinator<Partition, OffsetContext> result = super.start(config);
if (startCount.get() < 3) {
throw new RetriableException("Retry " + startCount.get());
}
return result;
}
};
baseSourceTask.initialize(mock(SourceTaskContext.class));
Map<String, String> config = Map.of(
CommonConnectorConfig.RETRIABLE_RESTART_WAIT.name(), "1" // wait 1ms between restarts
);
baseSourceTask.start(config);
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
pollAndIgnoreRetryException(baseSourceTask);
assertEquals(BaseSourceTask.State.RESTARTING, baseSourceTask.getState());
sleep(100); // wait 10ms in order to satisfy retriable wait
pollAndIgnoreRetryException(baseSourceTask);
assertEquals(BaseSourceTask.State.RESTARTING, baseSourceTask.getState());
sleep(100); // wait 10ms in order to satisfy retriable wait
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.RUNNING, baseSourceTask.getState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(3, baseSourceTask.startCount.get());
assertEquals(3, baseSourceTask.stopCount.get());
verify(baseSourceTask.coordinator, times(1)).stop();
}
@Test
public void verifyOutOfOrderPollDoesNotStartTask() throws InterruptedException {
baseSourceTask.start(new HashMap<>());
assertEquals(BaseSourceTask.State.INITIAL, baseSourceTask.getState());
baseSourceTask.stop();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
baseSourceTask.poll();
assertEquals(BaseSourceTask.State.STOPPED, baseSourceTask.getState());
assertEquals(0, baseSourceTask.startCount.get());
assertEquals(1, baseSourceTask.stopCount.get());
}
private static void pollAndIgnoreRetryException(BaseSourceTask<Partition, OffsetContext> baseSourceTask) throws InterruptedException {
try {
baseSourceTask.poll();
}
catch (RetriableException e) {
// nothing to do
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException();
}
}
public static class MyBaseSourceTask extends BaseSourceTask<Partition, OffsetContext> {
final List<SourceRecord> records = new ArrayList<>();
final AtomicInteger startCount = new AtomicInteger();
final AtomicInteger stopCount = new AtomicInteger();
@SuppressWarnings("unchecked")
final ChangeEventSourceCoordinator<Partition, OffsetContext> coordinator = mock(ChangeEventSourceCoordinator.class);
@Override
protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configuration config) {
startCount.incrementAndGet();
return coordinator;
}
@Override
protected List<SourceRecord> doPoll() {
return records;
}
@Override
protected void doStop() {
stopCount.incrementAndGet();
}
@Override
protected Iterable<Field> getAllConfigurationFields() {
return List.of(Field.create("f1"));
}
@Override
public String version() {
return "1.0";
}
}
}