diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 71e46aa90..8c343204c 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -208,7 +208,8 @@ public static SnapshotLockingMode parse(String value, String defaultValue) { BinlogConnectorConfig.GTID_SOURCE_EXCLUDES) .type( JDBC_DRIVER, - JDBC_PROTOCOL) + JDBC_PROTOCOL, + QUERY_TIMEOUT_MS,) .connector(SNAPSHOT_LOCKING_MODE) .events( GTID_SOURCE_INCLUDES, diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java index 03f644cb1..fc293131f 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java @@ -41,7 +41,7 @@ public static MySqlTestConnection forTestDatabase(String databaseName) { /** * Obtain a connection instance to the named test replica database. - * if no replica, obtain same connection with {@link #forTestDatabase(String) forTestDatabase} + * if no replica, obtain same connection with {@link #forTestDatabase(String, int) forTestDatabase} * @param databaseName the name of the test replica database * @return the MySQLConnection instance; never null */ @@ -49,6 +49,24 @@ public static MySqlTestConnection forTestReplicaDatabase(String databaseName) { return new MySqlTestConnection(getReplicaJdbcConfig(databaseName).build()); } + /** + * Obtain a connection instance to the named test database. + * + * + * @param databaseName the name of the test database + * @param queryTimeout + * @return the MySQLConnection instance; never null + */ + + public static MySqlTestConnection forTestDatabase(String databaseName, int queryTimeout) { + return new MySqlTestConnection(JdbcConfiguration.copy( + Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX))) + .withDatabase(databaseName) + .withQueryTimeoutMs(queryTimeout) + .with("characterEncoding", "utf8") + .build()); + } + /** * Obtain a connection instance to the named test database. * @param databaseName the name of the test database diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 74fe2f45c..05a5cce4c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -645,6 +645,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector USER, PASSWORD, DATABASE_NAME, + QUERY_TIMEOUT_MS, PDB_NAME, XSTREAM_SERVER_NAME, SNAPSHOT_MODE, diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/ConnectionIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/ConnectionIT.java new file mode 100644 index 000000000..78b6d3eee --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/ConnectionIT.java @@ -0,0 +1,58 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.sql.SQLException; +import java.sql.SQLTimeoutException; + +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.oracle.util.TestHelper; +import io.debezium.util.Testing; + +public class ConnectionIT implements Testing { + + @Test + public void shouldDoStuffWithDatabase() throws SQLException { + + Configuration config = TestHelper.testConfig().with("database.query.timeout.ms", "1000").build(); + + try (OracleConnection conn = TestHelper.testConnection(config)) { + conn.connect(); + TestHelper.dropTable(conn, "debezium.customer"); + conn.execute("create table debezium.customer (" + + " id numeric(9,0) not null, " + + " name varchar2(1000), " + + " score decimal(6, 2), " + + " registered timestamp, " + + " primary key (id)" + + ")"); + + conn.execute("SELECT * FROM debezium.customer"); + } + } + + @Test + public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException { + + Configuration config = TestHelper.defaultConfig().with("database.query.timeout.ms", "1000").build(); + + try (OracleConnection conn = TestHelper.testConnection(config)) { + conn.connect(); + + assertThatThrownBy(() -> conn.execute("begin\n" + + " dbms_lock.sleep(10);\n" + + "end;")) + .isInstanceOf(SQLTimeoutException.class) + .hasMessage("ORA-01013: user requested cancel of current operation\n" + + "ORA-06512: at \"SYS.DBMS_LOCK\", line 215\n" + + "ORA-06512: at line 2\n"); + } + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index 35913404a..cd07e8023 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -256,7 +256,7 @@ private static JdbcConfiguration adminJdbcConfig() { /** * Returns a configuration builder based on the test schema and user account settings. */ - private static Configuration.Builder testConfig() { + public static Configuration.Builder testConfig() { JdbcConfiguration jdbcConfiguration = testJdbcConfig(); Configuration.Builder builder = Configuration.create(); @@ -312,6 +312,15 @@ public static OracleConnection testConnection() { return createConnection(config, JdbcConfiguration.adapt(jdbcConfig), false); } + /** + * Return a test connection that is suitable for performing test database changes in tests. + */ + public static OracleConnection testConnection(Configuration config) { + + Configuration jdbcConfig = config.subset(DATABASE_PREFIX, true); + return createConnection(config, JdbcConfiguration.adapt(jdbcConfig), false); + } + /** * Return a connection that is suitable for performing test database changes that require * an administrator role permission. diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 64e694747..44e6a34c3 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -1123,6 +1123,7 @@ protected SourceInfoStructMaker getSourceInfoStruc USER, PASSWORD, DATABASE_NAME, + QUERY_TIMEOUT_MS, PLUGIN_NAME, SLOT_NAME, PUBLICATION_NAME, diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/ConnectionIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/ConnectionIT.java new file mode 100644 index 000000000..409b30616 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/ConnectionIT.java @@ -0,0 +1,54 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.sql.SQLException; + +import org.junit.Test; +import org.postgresql.util.PSQLException; + +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.util.Testing; + +public class ConnectionIT implements Testing { + + @Test + public void shouldDoStuffWithDatabase() throws SQLException { + + try (PostgresConnection conn = TestHelper.create()) { + conn.connect(); + conn.execute("DROP TABLE IF EXISTS customer"); + conn.execute("create table customer (" + + " id numeric(9,0) not null, " + + " name varchar(1000), " + + " score decimal(6, 2), " + + " registered timestamp, " + + " primary key (id)" + + ")"); + + conn.execute("SELECT * FROM customer"); + } + } + + @Test + public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException { + + Configuration config = TestHelper.defaultJdbcConfig().edit() + .with("query.timeout.ms", "1000").build(); + + try (PostgresConnection conn = TestHelper.create(JdbcConfiguration.adapt(config))) { + conn.connect(); + + assertThatThrownBy(() -> conn.execute("SELECT pg_sleep(10)")) + .isInstanceOf(PSQLException.class) + .hasMessage("ERROR: canceling statement due to user request"); + } + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index 8da485c3c..79a669431 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -141,6 +141,16 @@ public static PostgresConnection create() { return new PostgresConnection(defaultJdbcConfig(), CONNECTION_TEST); } + /** + * Obtain a default DB connection. + * + * @param jdbcConfiguration jdbc configuration to use + * @return the PostgresConnection instance; never null + */ + public static PostgresConnection create(JdbcConfiguration jdbcConfiguration) { + return new PostgresConnection(jdbcConfiguration, CONNECTION_TEST); + } + /** * Obtain a DB connection providing type registry. * diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index dbc27a03c..1b734b10a 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -486,6 +486,7 @@ public static DataQueryMode parse(String value, String defaultValue) { PORT, USER, PASSWORD, + QUERY_TIMEOUT_MS, INSTANCE) .connector( SNAPSHOT_MODE, diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java index 4957ecae2..91563e19c 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java @@ -7,6 +7,7 @@ package io.debezium.connector.sqlserver; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.math.BigDecimal; import java.math.BigInteger; @@ -26,6 +27,7 @@ import org.junit.Before; import org.junit.Test; +import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.util.TestHelper; import io.debezium.doc.FixFor; import io.debezium.jdbc.JdbcValueConverters; @@ -582,6 +584,22 @@ public void shouldNotConnectToAnyOfMultipleDatabase() throws Exception { } } + @Test + public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException { + + TestHelper.createTestDatabase(); + Configuration config = TestHelper.defaultConnectorConfig() + .with("database.query.timeout.ms", "1000").build(); + + try (SqlServerConnection conn = TestHelper.testConnection(config)) { + conn.connect(); + + assertThatThrownBy(() -> conn.execute("WAITFOR DELAY '00:01'")) + .isInstanceOf(SQLException.class) + .hasMessage("The query has timed out."); + } + } + private long toMillis(OffsetDateTime datetime) { return datetime.toInstant().toEpochMilli(); } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java index 28b79424b..dd6bedbad 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConfiguration.java @@ -74,12 +74,18 @@ public interface JdbcConfiguration extends Configuration { .withDefault(600000) .withValidation(Field::isOptional); + Field QUERY_TIMEOUT_MS = Field.create("query.timeout.ms") + .withDisplayName("Time to wait for a query to execute, given in milliseconds. Defaults to 600 seconds (600,000 ms); zero means there is no limit.") + .withType(Type.INT) + .withDefault(600000) + .withValidation(Field::isOptional); + /** * The set of names of the pre-defined JDBC configuration fields, including {@link #DATABASE}, {@link #USER}, * {@link #PASSWORD}, {@link #HOSTNAME}, and {@link #PORT}. */ Set ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, - CONNECTION_FACTORY_CLASS, CONNECTION_TIMEOUT_MS); + CONNECTION_FACTORY_CLASS, CONNECTION_TIMEOUT_MS, QUERY_TIMEOUT_MS); /** * Obtain a {@link JdbcConfiguration} adapter for the given {@link Configuration}. @@ -189,6 +195,16 @@ default Builder withConnectionFactoryClass(String connectionFactoryClass) { default Builder withConnectionTimeoutMs(int connectionTimeoutMs) { return with(CONNECTION_TIMEOUT_MS, connectionTimeoutMs); } + + /** + * Use the given query timeout in the resulting configuration. + * + * @param queryTimeoutMs query timeout in ms + * @return this builder object so methods can be chained together; never null + */ + default Builder withQueryTimeoutMs(int queryTimeoutMs) { + return with(QUERY_TIMEOUT_MS, queryTimeoutMs); + } } /** @@ -395,4 +411,13 @@ default String getConnectionFactoryClassName() { default Duration getConnectionTimeout() { return Duration.ofMillis(getInteger(CONNECTION_TIMEOUT_MS)); } + + /** + * Get the query timeout from the configuration. + * + * @return the specified value, or null if there is none. + */ + default Duration getQueryTimeout() { + return Duration.ofMillis(getInteger(QUERY_TIMEOUT_MS)); + } } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index bb76c1db2..e35a5e8f4 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -327,6 +327,7 @@ private static String findAndReplace(String url, String name, Properties props, private final String openingQuoteCharacter; private final String closingQuoteCharacter; private volatile Connection conn; + private final int queryTimeout; /** * Create a new instance with the given configuration and connection factory. @@ -356,6 +357,7 @@ protected JdbcConnection(JdbcConfiguration config, ConnectionFactory connectionF this.openingQuoteCharacter = openingQuotingChar; this.closingQuoteCharacter = closingQuotingChar; this.conn = null; + this.queryTimeout = (int) config.getQueryTimeout().toSeconds(); } /** @@ -433,7 +435,11 @@ public JdbcConnection execute(String... sqlStatements) throws SQLException { */ public JdbcConnection execute(Operations operations) throws SQLException { Connection conn = connection(); - try (Statement statement = conn.createStatement();) { + try (Statement statement = conn.createStatement()) { + statement.setQueryTimeout(queryTimeout); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Executing query with {}s timeout", queryTimeout); + } operations.apply(statement); commit(); } @@ -536,9 +542,10 @@ public JdbcConnection call(String sql, CallPreparer callPreparer, ResultSetConsu */ public JdbcConnection query(String query, StatementFactory statementFactory, ResultSetConsumer resultConsumer) throws SQLException { Connection conn = connection(); - try (Statement statement = statementFactory.createStatement(conn);) { + try (Statement statement = statementFactory.createStatement(conn)) { + statement.setQueryTimeout(queryTimeout); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("running '{}'", query); + LOGGER.trace("running '{}' with {}s timeout", query, queryTimeout); } try (ResultSet resultSet = statement.executeQuery(query);) { if (resultConsumer != null) { @@ -624,9 +631,10 @@ public JdbcConnection prepareQuery(String[] multiQuery, StatementPreparer[] prep public T queryAndMap(String query, StatementFactory statementFactory, ResultSetMapper mapper) throws SQLException { Objects.requireNonNull(mapper, "Mapper must be provided"); Connection conn = connection(); - try (Statement statement = statementFactory.createStatement(conn);) { + try (Statement statement = statementFactory.createStatement(conn)) { + statement.setQueryTimeout(queryTimeout); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("running '{}'", query); + LOGGER.trace("running '{}' with {}s timeout", query, queryTimeout); } try (ResultSet resultSet = statement.executeQuery(query);) { return mapper.apply(resultSet); @@ -637,9 +645,10 @@ public T queryAndMap(String query, StatementFactory statementFactory, Result public JdbcConnection queryWithBlockingConsumer(String query, StatementFactory statementFactory, BlockingResultSetConsumer resultConsumer) throws SQLException, InterruptedException { Connection conn = connection(); - try (Statement statement = statementFactory.createStatement(conn);) { + try (Statement statement = statementFactory.createStatement(conn)) { + statement.setQueryTimeout(queryTimeout); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("running '{}'", query); + LOGGER.trace("running '{}' with {}s timeout", query, queryTimeout); } try (ResultSet resultSet = statement.executeQuery(query);) { if (resultConsumer != null) { @@ -679,7 +688,11 @@ public JdbcConnection prepareQueryWithBlockingConsumer(String preparedQueryStrin throws SQLException, InterruptedException { final PreparedStatement statement = createPreparedStatement(preparedQueryString); preparer.accept(statement); - try (ResultSet resultSet = statement.executeQuery();) { + statement.setQueryTimeout(queryTimeout); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Executing '{}' with {}s timeout", preparedQueryString, queryTimeout); + } + try (ResultSet resultSet = statement.executeQuery()) { if (resultConsumer != null) { resultConsumer.accept(resultSet); } @@ -757,7 +770,10 @@ public JdbcConnection prepareUpdate(String stmt, StatementPreparer preparer) thr if (preparer != null) { preparer.accept(statement); } - LOGGER.trace("Executing statement '{}'", stmt); + statement.setQueryTimeout(queryTimeout); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Executing statement '{}' with {}s timeout", stmt, queryTimeout); + } statement.execute(); return this; } @@ -1426,7 +1442,12 @@ private PreparedStatement createPreparedStatement(String preparedQueryString) { return statementCache.computeIfAbsent(preparedQueryString, query -> { try { LOGGER.trace("Inserting prepared statement '{}' removed from the cache", query); - return connection().prepareStatement(query); + PreparedStatement preparedStatement = connection().prepareStatement(query); + preparedStatement.setQueryTimeout(queryTimeout); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("PreparedStatement '{}' with {}s timeout", preparedQueryString, queryTimeout); + } + return preparedStatement; } catch (SQLException e) { throw new ConnectException(e); @@ -1447,9 +1468,10 @@ public JdbcConnection executeWithoutCommitting(String... statements) throws SQLE throw new DebeziumException("Cannot execute without committing because auto-commit is enabled"); } try (Statement statement = conn.createStatement()) { + statement.setQueryTimeout(queryTimeout); for (String stmt : statements) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Executing statement {}", stmt); + LOGGER.trace("Executing statement '{}' with {}s timeout", stmt, queryTimeout); } statement.execute(stmt); } @@ -1534,6 +1556,10 @@ public Optional nullsSortLast() { public Statement readTableStatement(CommonConnectorConfig connectorConfig, OptionalLong tableSize) throws SQLException { int fetchSize = connectorConfig.getSnapshotFetchSize(); final Statement statement = connection().createStatement(); // the default cursor is FORWARD_ONLY + statement.setQueryTimeout(queryTimeout); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Created Statement with {}s timeout", queryTimeout); + } statement.setFetchSize(fetchSize); return statement; } @@ -1544,6 +1570,10 @@ public Statement readTableStatement(CommonConnectorConfig connectorConfig, Optio public PreparedStatement readTablePreparedStatement(CommonConnectorConfig connectorConfig, String sql, OptionalLong tableSize) throws SQLException { int fetchSize = connectorConfig.getSnapshotFetchSize(); final PreparedStatement statement = connection().prepareStatement(sql); // the default cursor is FORWARD_ONLY + statement.setQueryTimeout(queryTimeout); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("PreparedStatement '{}' with {}s timeout", sql, queryTimeout); + } statement.setFetchSize(fetchSize); return statement; } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java index a13aed716..9ef62cf4d 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java @@ -249,6 +249,15 @@ public static SnapshotTablesRowCountOrder parse(String value, String defaultValu .required() .withDescription("The name of the database from which the connector should capture changes"); + public static final Field QUERY_TIMEOUT_MS = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.QUERY_TIMEOUT_MS) + .withDisplayName("Query timeout") + .withType(Type.INT) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 12)) + .withDefault(600000) + .withImportance(Importance.LOW) + .withValidation(Field::isOptional) + .withDescription("Time to wait for a query to execute, given in milliseconds. Defaults to 600 seconds (600,000 ms); zero means there is no limit."); + /** * A comma-separated list of regular expressions that match the fully-qualified names of tables to be monitored. * Fully-qualified names for tables are of the form {@code .} or