DBZ-5244 Retry all communication exceptions by default

This commit is contained in:
Jiri Pechanec 2022-08-24 14:52:59 +02:00 committed by Chris Cranford
parent 7ccd40129c
commit de53591dba
8 changed files with 60 additions and 106 deletions

View File

@ -5,8 +5,14 @@
*/
package io.debezium.connector.mongodb;
import java.io.IOException;
import java.util.Set;
import com.mongodb.MongoException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.util.Collect;
/**
* Error handler for MongoDB.
@ -20,22 +26,7 @@ public MongoDbErrorHandler(MongoDbConnectorConfig connectorConfig, ChangeEventQu
}
@Override
protected boolean isRetriable(Throwable throwable) {
if (throwable instanceof org.apache.kafka.connect.errors.ConnectException) {
Throwable cause = throwable.getCause();
while ((cause != null) && (cause != throwable)) {
if (cause instanceof com.mongodb.MongoSocketException ||
cause instanceof com.mongodb.MongoTimeoutException ||
cause instanceof com.mongodb.MongoExecutionTimeoutException ||
cause instanceof com.mongodb.MongoNodeIsRecoveringException) {
return true;
}
else {
cause = cause.getCause();
}
}
}
return false;
protected Set<Class<? extends Exception>> communicationExceptions() {
return Collect.unmodifiableSet(IOException.class, MongoException.class);
}
}

View File

@ -5,14 +5,13 @@
*/
package io.debezium.connector.mysql;
import java.io.EOFException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Set;
import com.github.shyiko.mysql.binlog.network.ServerException;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.util.Collect;
/**
* Error handler for MySQL.
@ -21,29 +20,12 @@
*/
public class MySqlErrorHandler extends ErrorHandler {
private static final String SQL_CODE_TOO_MANY_CONNECTIONS = "08004";
public MySqlErrorHandler(MySqlConnectorConfig connectorConfig, ChangeEventQueue<?> queue) {
super(MySqlConnector.class, connectorConfig, queue);
}
@Override
protected boolean isRetriable(Throwable throwable) {
if (throwable instanceof SQLException) {
final SQLException sql = (SQLException) throwable;
return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSQLState());
}
else if (throwable instanceof ServerException) {
final ServerException sql = (ServerException) throwable;
return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSqlState());
}
else if (throwable instanceof EOFException) {
// Retry with reading binlog error
return throwable.getMessage().contains("Failed to read next byte from position");
}
else if (throwable instanceof DebeziumException && throwable.getCause() != null) {
return isRetriable(throwable.getCause());
}
return false;
protected Set<Class<? extends Exception>> communicationExceptions() {
return Collect.unmodifiableSet(IOException.class, SQLException.class);
}
}

View File

@ -5,12 +5,10 @@
*/
package io.debezium.connector.postgresql;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Set;
import org.postgresql.util.PSQLException;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.util.Collect;
@ -22,39 +20,18 @@
*/
public class PostgresErrorHandler extends ErrorHandler {
@Immutable
private static final Set<String> RETRIABLE_EXCEPTION_MESSSAGES = Collect.unmodifiableSet(
"Database connection failed when writing to copy",
"Database connection failed when reading from copy",
"An I/O error occurred while sending to the backend",
"ERROR: could not open relation with OID",
"This connection has been closed",
"terminating connection due to unexpected postmaster exit",
"terminating connection due to administrator command");
public PostgresErrorHandler(PostgresConnectorConfig connectorConfig, ChangeEventQueue<?> queue) {
super(PostgresConnector.class, connectorConfig, queue);
}
@Override
protected boolean isRetriable(Throwable throwable) {
if (isRetriablePsqlException(throwable)) {
return true;
}
else if (throwable instanceof DebeziumException) {
return isRetriablePsqlException(throwable.getCause());
}
return false;
protected Set<Class<? extends Exception>> communicationExceptions() {
return Collect.unmodifiableSet(IOException.class, SQLException.class);
}
public boolean isRetriablePsqlException(Throwable throwable) {
if (throwable != null && throwable instanceof PSQLException && throwable.getMessage() != null) {
for (String messageText : RETRIABLE_EXCEPTION_MESSSAGES) {
if (throwable.getMessage().contains(messageText)) {
return true;
}
}
}
return false;
// Introduced for testing only
@Override
protected boolean isRetriable(Throwable throwable) {
return super.isRetriable(throwable);
}
}

View File

@ -31,8 +31,8 @@ public void classifiedPSQLExceptionIsRetryable() {
}
@Test
public void psqlExceptionWithNullErrorMesdsageNotRetryable() {
PSQLException testException = new PSQLException(null, PSQLState.CONNECTION_FAILURE);
public void nonCommunicationExceptionNotRetryable() {
Exception testException = new NullPointerException();
Assertions.assertThat(errorHandler.isRetriable(testException)).isFalse();
}
@ -42,10 +42,10 @@ public void nullThrowableIsNotRetryable() {
}
@Test
public void unclassifiedPSQLExceptionIsNotRetryable() {
PSQLException testException = new PSQLException(
"definitely not a postgres error", PSQLState.CONNECTION_FAILURE);
Assertions.assertThat(errorHandler.isRetriable(testException)).isFalse();
public void encapsulatedPSQLExceptionIsRetriable() {
Exception testException = new IllegalArgumentException(
new PSQLException("definitely not a postgres error", PSQLState.CONNECTION_FAILURE));
Assertions.assertThat(errorHandler.isRetriable(testException)).isTrue();
}
@Test

View File

@ -5,10 +5,13 @@
*/
package io.debezium.connector.sqlserver;
import com.microsoft.sqlserver.jdbc.SQLServerException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Set;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.util.Collect;
/**
* Error handler for SQL Server.
@ -22,28 +25,7 @@ public SqlServerErrorHandler(SqlServerConnectorConfig connectorConfig, ChangeEve
}
@Override
protected boolean isRetriable(Throwable throwable) {
if (!(throwable instanceof SQLServerException) && throwable.getCause() instanceof SQLServerException) {
throwable = throwable.getCause();
}
return throwable instanceof SQLServerException
&& (throwable.getMessage().contains("Connection timed out (Read failed)")
|| throwable.getMessage().contains("Connection timed out (Write failed)")
|| throwable.getMessage().contains("The connection has been closed.")
|| throwable.getMessage().contains("The connection is closed.")
|| throwable.getMessage().contains("The login failed.")
|| throwable.getMessage().contains("Server is in script upgrade mode.")
|| throwable.getMessage().contains("Try the statement later.")
|| throwable.getMessage().contains("Connection reset")
|| throwable.getMessage().contains("Socket closed")
|| throwable.getMessage().contains("SHUTDOWN is in progress")
|| throwable.getMessage().contains("The server failed to resume the transaction")
|| throwable.getMessage().contains("Verify the connection properties")
|| throwable.getMessage().contains("Broken pipe (Write failed)")
|| throwable.getMessage()
.startsWith("An insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_")
|| throwable.getMessage()
.endsWith("was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction."));
protected Set<Class<? extends Exception>> communicationExceptions() {
return Collect.unmodifiableSet(IOException.class, SQLException.class);
}
}

View File

@ -14,12 +14,14 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.After;
@ -29,6 +31,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotIsolationMode;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper;
@ -165,7 +168,8 @@ public void takeSnapshotAndStartStreaming() throws Exception {
@Test
@FixFor("DBZ-1280")
public void testDeadlockDetection() throws Exception {
final LogInterceptor logInterceptor = new LogInterceptor(ErrorHandler.class);
final LogInterceptor logInterceptorErrorHandler = new LogInterceptor(ErrorHandler.class);
final LogInterceptor logInterceptorTask = new LogInterceptor(BaseSourceTask.class);
final Configuration config = TestHelper.defaultConfig()
.with(RelationalDatabaseConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, 1_000)
.build();
@ -176,8 +180,10 @@ public void testDeadlockDetection() throws Exception {
connection.setAutoCommit(false).executeWithoutCommitting(
"SELECT TOP(0) * FROM dbo.table1 WITH (TABLOCKX)");
consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE);
assertConnectorNotRunning();
assertThat(logInterceptor.containsStacktraceElement("Lock request time out period exceeded.")).as("Log contains error related to lock timeout").isTrue();
Awaitility.await().atMost(TestHelper.waitTimeForLogEntries(), TimeUnit.SECONDS).until(
() -> logInterceptorTask.containsWarnMessage("Going to restart connector after "));
assertThat(logInterceptorErrorHandler.containsStacktraceElement("Lock request time out period exceeded.")).as("Log contains error related to lock timeout")
.isTrue();
connection.rollback();
}

View File

@ -5,6 +5,9 @@
*/
package io.debezium.pipeline;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
@ -56,12 +59,25 @@ public Throwable getProducerThrowable() {
return producerThrowable.get();
}
protected Set<Class<? extends Exception>> communicationExceptions() {
return Collections.singleton(IOException.class);
}
/**
* Whether the given throwable is retriable (e.g. an exception indicating a
* connection loss) or not.
* By default only I/O exceptions are retriable
*/
protected boolean isRetriable(Throwable throwable) {
return false;
if (throwable == null) {
return false;
}
for (Class<? extends Exception> e : communicationExceptions()) {
if (e.isAssignableFrom(throwable.getClass())) {
return true;
}
}
return isRetriable(throwable.getCause());
}
/**

View File

@ -77,7 +77,7 @@ public static <T> Set<T> unmodifiableSet(Set<T> values, T... additionalValues) {
return Collections.unmodifiableSet(newSet);
}
@SuppressWarnings("unchecked")
@SafeVarargs
public static <T> Set<T> unmodifiableSet(T... values) {
return unmodifiableSet(arrayListOf(values));
}