From de53591dba2cc1ccbb0d7f2c6a5dccd258692e58 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Wed, 24 Aug 2022 14:52:59 +0200 Subject: [PATCH] DBZ-5244 Retry all communication exceptions by default --- .../mongodb/MongoDbErrorHandler.java | 25 ++++-------- .../connector/mysql/MySqlErrorHandler.java | 28 +++---------- .../postgresql/PostgresErrorHandler.java | 39 ++++--------------- .../postgresql/PostgresErrorHandlerTest.java | 12 +++--- .../sqlserver/SqlServerErrorHandler.java | 30 +++----------- .../connector/sqlserver/SnapshotIT.java | 12 ++++-- .../io/debezium/pipeline/ErrorHandler.java | 18 ++++++++- .../main/java/io/debezium/util/Collect.java | 2 +- 8 files changed, 60 insertions(+), 106 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbErrorHandler.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbErrorHandler.java index 921969d61..281a6ce52 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbErrorHandler.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbErrorHandler.java @@ -5,8 +5,14 @@ */ package io.debezium.connector.mongodb; +import java.io.IOException; +import java.util.Set; + +import com.mongodb.MongoException; + import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.ErrorHandler; +import io.debezium.util.Collect; /** * Error handler for MongoDB. @@ -20,22 +26,7 @@ public MongoDbErrorHandler(MongoDbConnectorConfig connectorConfig, ChangeEventQu } @Override - protected boolean isRetriable(Throwable throwable) { - if (throwable instanceof org.apache.kafka.connect.errors.ConnectException) { - Throwable cause = throwable.getCause(); - while ((cause != null) && (cause != throwable)) { - if (cause instanceof com.mongodb.MongoSocketException || - cause instanceof com.mongodb.MongoTimeoutException || - cause instanceof com.mongodb.MongoExecutionTimeoutException || - cause instanceof com.mongodb.MongoNodeIsRecoveringException) { - return true; - } - else { - cause = cause.getCause(); - } - } - } - - return false; + protected Set> communicationExceptions() { + return Collect.unmodifiableSet(IOException.class, MongoException.class); } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java index 0bace052f..6313d4e5e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java @@ -5,14 +5,13 @@ */ package io.debezium.connector.mysql; -import java.io.EOFException; +import java.io.IOException; import java.sql.SQLException; +import java.util.Set; -import com.github.shyiko.mysql.binlog.network.ServerException; - -import io.debezium.DebeziumException; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.ErrorHandler; +import io.debezium.util.Collect; /** * Error handler for MySQL. @@ -21,29 +20,12 @@ */ public class MySqlErrorHandler extends ErrorHandler { - private static final String SQL_CODE_TOO_MANY_CONNECTIONS = "08004"; - public MySqlErrorHandler(MySqlConnectorConfig connectorConfig, ChangeEventQueue queue) { super(MySqlConnector.class, connectorConfig, queue); } @Override - protected boolean isRetriable(Throwable throwable) { - if (throwable instanceof SQLException) { - final SQLException sql = (SQLException) throwable; - return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSQLState()); - } - else if (throwable instanceof ServerException) { - final ServerException sql = (ServerException) throwable; - return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSqlState()); - } - else if (throwable instanceof EOFException) { - // Retry with reading binlog error - return throwable.getMessage().contains("Failed to read next byte from position"); - } - else if (throwable instanceof DebeziumException && throwable.getCause() != null) { - return isRetriable(throwable.getCause()); - } - return false; + protected Set> communicationExceptions() { + return Collect.unmodifiableSet(IOException.class, SQLException.class); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java index 83f6f57ad..d29e57b61 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresErrorHandler.java @@ -5,12 +5,10 @@ */ package io.debezium.connector.postgresql; +import java.io.IOException; +import java.sql.SQLException; import java.util.Set; -import org.postgresql.util.PSQLException; - -import io.debezium.DebeziumException; -import io.debezium.annotation.Immutable; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.ErrorHandler; import io.debezium.util.Collect; @@ -22,39 +20,18 @@ */ public class PostgresErrorHandler extends ErrorHandler { - @Immutable - private static final Set RETRIABLE_EXCEPTION_MESSSAGES = Collect.unmodifiableSet( - "Database connection failed when writing to copy", - "Database connection failed when reading from copy", - "An I/O error occurred while sending to the backend", - "ERROR: could not open relation with OID", - "This connection has been closed", - "terminating connection due to unexpected postmaster exit", - "terminating connection due to administrator command"); - public PostgresErrorHandler(PostgresConnectorConfig connectorConfig, ChangeEventQueue queue) { super(PostgresConnector.class, connectorConfig, queue); } @Override - protected boolean isRetriable(Throwable throwable) { - if (isRetriablePsqlException(throwable)) { - return true; - } - else if (throwable instanceof DebeziumException) { - return isRetriablePsqlException(throwable.getCause()); - } - return false; + protected Set> communicationExceptions() { + return Collect.unmodifiableSet(IOException.class, SQLException.class); } - public boolean isRetriablePsqlException(Throwable throwable) { - if (throwable != null && throwable instanceof PSQLException && throwable.getMessage() != null) { - for (String messageText : RETRIABLE_EXCEPTION_MESSSAGES) { - if (throwable.getMessage().contains(messageText)) { - return true; - } - } - } - return false; + // Introduced for testing only + @Override + protected boolean isRetriable(Throwable throwable) { + return super.isRetriable(throwable); } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresErrorHandlerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresErrorHandlerTest.java index c8c28fbd0..cbc561726 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresErrorHandlerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresErrorHandlerTest.java @@ -31,8 +31,8 @@ public void classifiedPSQLExceptionIsRetryable() { } @Test - public void psqlExceptionWithNullErrorMesdsageNotRetryable() { - PSQLException testException = new PSQLException(null, PSQLState.CONNECTION_FAILURE); + public void nonCommunicationExceptionNotRetryable() { + Exception testException = new NullPointerException(); Assertions.assertThat(errorHandler.isRetriable(testException)).isFalse(); } @@ -42,10 +42,10 @@ public void nullThrowableIsNotRetryable() { } @Test - public void unclassifiedPSQLExceptionIsNotRetryable() { - PSQLException testException = new PSQLException( - "definitely not a postgres error", PSQLState.CONNECTION_FAILURE); - Assertions.assertThat(errorHandler.isRetriable(testException)).isFalse(); + public void encapsulatedPSQLExceptionIsRetriable() { + Exception testException = new IllegalArgumentException( + new PSQLException("definitely not a postgres error", PSQLState.CONNECTION_FAILURE)); + Assertions.assertThat(errorHandler.isRetriable(testException)).isTrue(); } @Test diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerErrorHandler.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerErrorHandler.java index a3604c081..a560baf15 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerErrorHandler.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerErrorHandler.java @@ -5,10 +5,13 @@ */ package io.debezium.connector.sqlserver; -import com.microsoft.sqlserver.jdbc.SQLServerException; +import java.io.IOException; +import java.sql.SQLException; +import java.util.Set; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.pipeline.ErrorHandler; +import io.debezium.util.Collect; /** * Error handler for SQL Server. @@ -22,28 +25,7 @@ public SqlServerErrorHandler(SqlServerConnectorConfig connectorConfig, ChangeEve } @Override - protected boolean isRetriable(Throwable throwable) { - if (!(throwable instanceof SQLServerException) && throwable.getCause() instanceof SQLServerException) { - throwable = throwable.getCause(); - } - - return throwable instanceof SQLServerException - && (throwable.getMessage().contains("Connection timed out (Read failed)") - || throwable.getMessage().contains("Connection timed out (Write failed)") - || throwable.getMessage().contains("The connection has been closed.") - || throwable.getMessage().contains("The connection is closed.") - || throwable.getMessage().contains("The login failed.") - || throwable.getMessage().contains("Server is in script upgrade mode.") - || throwable.getMessage().contains("Try the statement later.") - || throwable.getMessage().contains("Connection reset") - || throwable.getMessage().contains("Socket closed") - || throwable.getMessage().contains("SHUTDOWN is in progress") - || throwable.getMessage().contains("The server failed to resume the transaction") - || throwable.getMessage().contains("Verify the connection properties") - || throwable.getMessage().contains("Broken pipe (Write failed)") - || throwable.getMessage() - .startsWith("An insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_") - || throwable.getMessage() - .endsWith("was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.")); + protected Set> communicationExceptions() { + return Collect.unmodifiableSet(IOException.class, SQLException.class); } } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index 8ba232e1f..f4dfdc03f 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -14,12 +14,14 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.awaitility.Awaitility; import org.fest.assertions.Assertions; import org.fest.assertions.MapAssert; import org.junit.After; @@ -29,6 +31,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.connector.common.BaseSourceTask; import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotIsolationMode; import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.util.TestHelper; @@ -165,7 +168,8 @@ public void takeSnapshotAndStartStreaming() throws Exception { @Test @FixFor("DBZ-1280") public void testDeadlockDetection() throws Exception { - final LogInterceptor logInterceptor = new LogInterceptor(ErrorHandler.class); + final LogInterceptor logInterceptorErrorHandler = new LogInterceptor(ErrorHandler.class); + final LogInterceptor logInterceptorTask = new LogInterceptor(BaseSourceTask.class); final Configuration config = TestHelper.defaultConfig() .with(RelationalDatabaseConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, 1_000) .build(); @@ -176,8 +180,10 @@ public void testDeadlockDetection() throws Exception { connection.setAutoCommit(false).executeWithoutCommitting( "SELECT TOP(0) * FROM dbo.table1 WITH (TABLOCKX)"); consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE); - assertConnectorNotRunning(); - assertThat(logInterceptor.containsStacktraceElement("Lock request time out period exceeded.")).as("Log contains error related to lock timeout").isTrue(); + Awaitility.await().atMost(TestHelper.waitTimeForLogEntries(), TimeUnit.SECONDS).until( + () -> logInterceptorTask.containsWarnMessage("Going to restart connector after ")); + assertThat(logInterceptorErrorHandler.containsStacktraceElement("Lock request time out period exceeded.")).as("Log contains error related to lock timeout") + .isTrue(); connection.rollback(); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ErrorHandler.java b/debezium-core/src/main/java/io/debezium/pipeline/ErrorHandler.java index 369749013..703e9a263 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ErrorHandler.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ErrorHandler.java @@ -5,6 +5,9 @@ */ package io.debezium.pipeline; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.errors.ConnectException; @@ -56,12 +59,25 @@ public Throwable getProducerThrowable() { return producerThrowable.get(); } + protected Set> communicationExceptions() { + return Collections.singleton(IOException.class); + } + /** * Whether the given throwable is retriable (e.g. an exception indicating a * connection loss) or not. + * By default only I/O exceptions are retriable */ protected boolean isRetriable(Throwable throwable) { - return false; + if (throwable == null) { + return false; + } + for (Class e : communicationExceptions()) { + if (e.isAssignableFrom(throwable.getClass())) { + return true; + } + } + return isRetriable(throwable.getCause()); } /** diff --git a/debezium-core/src/main/java/io/debezium/util/Collect.java b/debezium-core/src/main/java/io/debezium/util/Collect.java index 6ef3e438e..82a325e8a 100644 --- a/debezium-core/src/main/java/io/debezium/util/Collect.java +++ b/debezium-core/src/main/java/io/debezium/util/Collect.java @@ -77,7 +77,7 @@ public static Set unmodifiableSet(Set values, T... additionalValues) { return Collections.unmodifiableSet(newSet); } - @SuppressWarnings("unchecked") + @SafeVarargs public static Set unmodifiableSet(T... values) { return unmodifiableSet(arrayListOf(values)); }