DBZ-7616 Add query timeout on JdbcConnection controlled by query.timeout.ms property

This commit is contained in:
mfvitale 2024-04-12 10:01:48 +02:00 committed by Fiore Mario Vitale
parent 294fdb9d3f
commit 8206d2d98d
13 changed files with 250 additions and 15 deletions

View File

@ -208,7 +208,8 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
BinlogConnectorConfig.GTID_SOURCE_EXCLUDES)
.type(
JDBC_DRIVER,
JDBC_PROTOCOL)
JDBC_PROTOCOL,
QUERY_TIMEOUT_MS,)
.connector(SNAPSHOT_LOCKING_MODE)
.events(
GTID_SOURCE_INCLUDES,

View File

@ -41,7 +41,7 @@ public static MySqlTestConnection forTestDatabase(String databaseName) {
/**
* Obtain a connection instance to the named test replica database.
* if no replica, obtain same connection with {@link #forTestDatabase(String) forTestDatabase}
* if no replica, obtain same connection with {@link #forTestDatabase(String, int) forTestDatabase}
* @param databaseName the name of the test replica database
* @return the MySQLConnection instance; never null
*/
@ -49,6 +49,24 @@ public static MySqlTestConnection forTestReplicaDatabase(String databaseName) {
return new MySqlTestConnection(getReplicaJdbcConfig(databaseName).build());
}
/**
* Obtain a connection instance to the named test database.
*
*
* @param databaseName the name of the test database
* @param queryTimeout
* @return the MySQLConnection instance; never null
*/
public static MySqlTestConnection forTestDatabase(String databaseName, int queryTimeout) {
return new MySqlTestConnection(JdbcConfiguration.copy(
Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX)))
.withDatabase(databaseName)
.withQueryTimeoutMs(queryTimeout)
.with("characterEncoding", "utf8")
.build());
}
/**
* Obtain a connection instance to the named test database.
* @param databaseName the name of the test database

View File

@ -645,6 +645,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
USER,
PASSWORD,
DATABASE_NAME,
QUERY_TIMEOUT_MS,
PDB_NAME,
XSTREAM_SERVER_NAME,
SNAPSHOT_MODE,

View File

@ -0,0 +1,58 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.util.Testing;
public class ConnectionIT implements Testing {
@Test
public void shouldDoStuffWithDatabase() throws SQLException {
Configuration config = TestHelper.testConfig().with("database.query.timeout.ms", "1000").build();
try (OracleConnection conn = TestHelper.testConnection(config)) {
conn.connect();
TestHelper.dropTable(conn, "debezium.customer");
conn.execute("create table debezium.customer (" +
" id numeric(9,0) not null, " +
" name varchar2(1000), " +
" score decimal(6, 2), " +
" registered timestamp, " +
" primary key (id)" +
")");
conn.execute("SELECT * FROM debezium.customer");
}
}
@Test
public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException {
Configuration config = TestHelper.defaultConfig().with("database.query.timeout.ms", "1000").build();
try (OracleConnection conn = TestHelper.testConnection(config)) {
conn.connect();
assertThatThrownBy(() -> conn.execute("begin\n" +
" dbms_lock.sleep(10);\n" +
"end;"))
.isInstanceOf(SQLTimeoutException.class)
.hasMessage("ORA-01013: user requested cancel of current operation\n" +
"ORA-06512: at \"SYS.DBMS_LOCK\", line 215\n" +
"ORA-06512: at line 2\n");
}
}
}

View File

@ -256,7 +256,7 @@ private static JdbcConfiguration adminJdbcConfig() {
/**
* Returns a configuration builder based on the test schema and user account settings.
*/
private static Configuration.Builder testConfig() {
public static Configuration.Builder testConfig() {
JdbcConfiguration jdbcConfiguration = testJdbcConfig();
Configuration.Builder builder = Configuration.create();
@ -312,6 +312,15 @@ public static OracleConnection testConnection() {
return createConnection(config, JdbcConfiguration.adapt(jdbcConfig), false);
}
/**
* Return a test connection that is suitable for performing test database changes in tests.
*/
public static OracleConnection testConnection(Configuration config) {
Configuration jdbcConfig = config.subset(DATABASE_PREFIX, true);
return createConnection(config, JdbcConfiguration.adapt(jdbcConfig), false);
}
/**
* Return a connection that is suitable for performing test database changes that require
* an administrator role permission.

View File

@ -1123,6 +1123,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
USER,
PASSWORD,
DATABASE_NAME,
QUERY_TIMEOUT_MS,
PLUGIN_NAME,
SLOT_NAME,
PUBLICATION_NAME,

View File

@ -0,0 +1,54 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.sql.SQLException;
import org.junit.Test;
import org.postgresql.util.PSQLException;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.Testing;
public class ConnectionIT implements Testing {
@Test
public void shouldDoStuffWithDatabase() throws SQLException {
try (PostgresConnection conn = TestHelper.create()) {
conn.connect();
conn.execute("DROP TABLE IF EXISTS customer");
conn.execute("create table customer (" +
" id numeric(9,0) not null, " +
" name varchar(1000), " +
" score decimal(6, 2), " +
" registered timestamp, " +
" primary key (id)" +
")");
conn.execute("SELECT * FROM customer");
}
}
@Test
public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException {
Configuration config = TestHelper.defaultJdbcConfig().edit()
.with("query.timeout.ms", "1000").build();
try (PostgresConnection conn = TestHelper.create(JdbcConfiguration.adapt(config))) {
conn.connect();
assertThatThrownBy(() -> conn.execute("SELECT pg_sleep(10)"))
.isInstanceOf(PSQLException.class)
.hasMessage("ERROR: canceling statement due to user request");
}
}
}

View File

@ -141,6 +141,16 @@ public static PostgresConnection create() {
return new PostgresConnection(defaultJdbcConfig(), CONNECTION_TEST);
}
/**
* Obtain a default DB connection.
*
* @param jdbcConfiguration jdbc configuration to use
* @return the PostgresConnection instance; never null
*/
public static PostgresConnection create(JdbcConfiguration jdbcConfiguration) {
return new PostgresConnection(jdbcConfiguration, CONNECTION_TEST);
}
/**
* Obtain a DB connection providing type registry.
*

View File

@ -486,6 +486,7 @@ public static DataQueryMode parse(String value, String defaultValue) {
PORT,
USER,
PASSWORD,
QUERY_TIMEOUT_MS,
INSTANCE)
.connector(
SNAPSHOT_MODE,

View File

@ -7,6 +7,7 @@
package io.debezium.connector.sqlserver;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.math.BigDecimal;
import java.math.BigInteger;
@ -26,6 +27,7 @@
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcValueConverters;
@ -582,6 +584,22 @@ public void shouldNotConnectToAnyOfMultipleDatabase() throws Exception {
}
}
@Test
public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException {
TestHelper.createTestDatabase();
Configuration config = TestHelper.defaultConnectorConfig()
.with("database.query.timeout.ms", "1000").build();
try (SqlServerConnection conn = TestHelper.testConnection(config)) {
conn.connect();
assertThatThrownBy(() -> conn.execute("WAITFOR DELAY '00:01'"))
.isInstanceOf(SQLException.class)
.hasMessage("The query has timed out.");
}
}
private long toMillis(OffsetDateTime datetime) {
return datetime.toInstant().toEpochMilli();
}

View File

@ -74,12 +74,18 @@ public interface JdbcConfiguration extends Configuration {
.withDefault(600000)
.withValidation(Field::isOptional);
Field QUERY_TIMEOUT_MS = Field.create("query.timeout.ms")
.withDisplayName("Time to wait for a query to execute, given in milliseconds. Defaults to 600 seconds (600,000 ms); zero means there is no limit.")
.withType(Type.INT)
.withDefault(600000)
.withValidation(Field::isOptional);
/**
* The set of names of the pre-defined JDBC configuration fields, including {@link #DATABASE}, {@link #USER},
* {@link #PASSWORD}, {@link #HOSTNAME}, and {@link #PORT}.
*/
Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS,
CONNECTION_FACTORY_CLASS, CONNECTION_TIMEOUT_MS);
CONNECTION_FACTORY_CLASS, CONNECTION_TIMEOUT_MS, QUERY_TIMEOUT_MS);
/**
* Obtain a {@link JdbcConfiguration} adapter for the given {@link Configuration}.
@ -189,6 +195,16 @@ default Builder withConnectionFactoryClass(String connectionFactoryClass) {
default Builder withConnectionTimeoutMs(int connectionTimeoutMs) {
return with(CONNECTION_TIMEOUT_MS, connectionTimeoutMs);
}
/**
* Use the given query timeout in the resulting configuration.
*
* @param queryTimeoutMs query timeout in ms
* @return this builder object so methods can be chained together; never null
*/
default Builder withQueryTimeoutMs(int queryTimeoutMs) {
return with(QUERY_TIMEOUT_MS, queryTimeoutMs);
}
}
/**
@ -395,4 +411,13 @@ default String getConnectionFactoryClassName() {
default Duration getConnectionTimeout() {
return Duration.ofMillis(getInteger(CONNECTION_TIMEOUT_MS));
}
/**
* Get the query timeout from the configuration.
*
* @return the specified value, or null if there is none.
*/
default Duration getQueryTimeout() {
return Duration.ofMillis(getInteger(QUERY_TIMEOUT_MS));
}
}

View File

@ -327,6 +327,7 @@ private static String findAndReplace(String url, String name, Properties props,
private final String openingQuoteCharacter;
private final String closingQuoteCharacter;
private volatile Connection conn;
private final int queryTimeout;
/**
* Create a new instance with the given configuration and connection factory.
@ -356,6 +357,7 @@ protected JdbcConnection(JdbcConfiguration config, ConnectionFactory connectionF
this.openingQuoteCharacter = openingQuotingChar;
this.closingQuoteCharacter = closingQuotingChar;
this.conn = null;
this.queryTimeout = (int) config.getQueryTimeout().toSeconds();
}
/**
@ -433,7 +435,11 @@ public JdbcConnection execute(String... sqlStatements) throws SQLException {
*/
public JdbcConnection execute(Operations operations) throws SQLException {
Connection conn = connection();
try (Statement statement = conn.createStatement();) {
try (Statement statement = conn.createStatement()) {
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Executing query with {}s timeout", queryTimeout);
}
operations.apply(statement);
commit();
}
@ -536,9 +542,10 @@ public JdbcConnection call(String sql, CallPreparer callPreparer, ResultSetConsu
*/
public JdbcConnection query(String query, StatementFactory statementFactory, ResultSetConsumer resultConsumer) throws SQLException {
Connection conn = connection();
try (Statement statement = statementFactory.createStatement(conn);) {
try (Statement statement = statementFactory.createStatement(conn)) {
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
LOGGER.trace("running '{}' with {}s timeout", query, queryTimeout);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
if (resultConsumer != null) {
@ -624,9 +631,10 @@ public JdbcConnection prepareQuery(String[] multiQuery, StatementPreparer[] prep
public <T> T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper<T> mapper) throws SQLException {
Objects.requireNonNull(mapper, "Mapper must be provided");
Connection conn = connection();
try (Statement statement = statementFactory.createStatement(conn);) {
try (Statement statement = statementFactory.createStatement(conn)) {
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
LOGGER.trace("running '{}' with {}s timeout", query, queryTimeout);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
return mapper.apply(resultSet);
@ -637,9 +645,10 @@ public <T> T queryAndMap(String query, StatementFactory statementFactory, Result
public JdbcConnection queryWithBlockingConsumer(String query, StatementFactory statementFactory, BlockingResultSetConsumer resultConsumer)
throws SQLException, InterruptedException {
Connection conn = connection();
try (Statement statement = statementFactory.createStatement(conn);) {
try (Statement statement = statementFactory.createStatement(conn)) {
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
LOGGER.trace("running '{}' with {}s timeout", query, queryTimeout);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
if (resultConsumer != null) {
@ -679,7 +688,11 @@ public JdbcConnection prepareQueryWithBlockingConsumer(String preparedQueryStrin
throws SQLException, InterruptedException {
final PreparedStatement statement = createPreparedStatement(preparedQueryString);
preparer.accept(statement);
try (ResultSet resultSet = statement.executeQuery();) {
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Executing '{}' with {}s timeout", preparedQueryString, queryTimeout);
}
try (ResultSet resultSet = statement.executeQuery()) {
if (resultConsumer != null) {
resultConsumer.accept(resultSet);
}
@ -757,7 +770,10 @@ public JdbcConnection prepareUpdate(String stmt, StatementPreparer preparer) thr
if (preparer != null) {
preparer.accept(statement);
}
LOGGER.trace("Executing statement '{}'", stmt);
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Executing statement '{}' with {}s timeout", stmt, queryTimeout);
}
statement.execute();
return this;
}
@ -1426,7 +1442,12 @@ private PreparedStatement createPreparedStatement(String preparedQueryString) {
return statementCache.computeIfAbsent(preparedQueryString, query -> {
try {
LOGGER.trace("Inserting prepared statement '{}' removed from the cache", query);
return connection().prepareStatement(query);
PreparedStatement preparedStatement = connection().prepareStatement(query);
preparedStatement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("PreparedStatement '{}' with {}s timeout", preparedQueryString, queryTimeout);
}
return preparedStatement;
}
catch (SQLException e) {
throw new ConnectException(e);
@ -1447,9 +1468,10 @@ public JdbcConnection executeWithoutCommitting(String... statements) throws SQLE
throw new DebeziumException("Cannot execute without committing because auto-commit is enabled");
}
try (Statement statement = conn.createStatement()) {
statement.setQueryTimeout(queryTimeout);
for (String stmt : statements) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Executing statement {}", stmt);
LOGGER.trace("Executing statement '{}' with {}s timeout", stmt, queryTimeout);
}
statement.execute(stmt);
}
@ -1534,6 +1556,10 @@ public Optional<Boolean> nullsSortLast() {
public Statement readTableStatement(CommonConnectorConfig connectorConfig, OptionalLong tableSize) throws SQLException {
int fetchSize = connectorConfig.getSnapshotFetchSize();
final Statement statement = connection().createStatement(); // the default cursor is FORWARD_ONLY
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Created Statement with {}s timeout", queryTimeout);
}
statement.setFetchSize(fetchSize);
return statement;
}
@ -1544,6 +1570,10 @@ public Statement readTableStatement(CommonConnectorConfig connectorConfig, Optio
public PreparedStatement readTablePreparedStatement(CommonConnectorConfig connectorConfig, String sql, OptionalLong tableSize) throws SQLException {
int fetchSize = connectorConfig.getSnapshotFetchSize();
final PreparedStatement statement = connection().prepareStatement(sql); // the default cursor is FORWARD_ONLY
statement.setQueryTimeout(queryTimeout);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("PreparedStatement '{}' with {}s timeout", sql, queryTimeout);
}
statement.setFetchSize(fetchSize);
return statement;
}

View File

@ -249,6 +249,15 @@ public static SnapshotTablesRowCountOrder parse(String value, String defaultValu
.required()
.withDescription("The name of the database from which the connector should capture changes");
public static final Field QUERY_TIMEOUT_MS = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.QUERY_TIMEOUT_MS)
.withDisplayName("Query timeout")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 12))
.withDefault(600000)
.withImportance(Importance.LOW)
.withValidation(Field::isOptional)
.withDescription("Time to wait for a query to execute, given in milliseconds. Defaults to 600 seconds (600,000 ms); zero means there is no limit.");
/**
* A comma-separated list of regular expressions that match the fully-qualified names of tables to be monitored.
* Fully-qualified names for tables are of the form {@code <databaseName>.<tableName>} or