DBZ-7616 Align query timeout changes to MariaDB connector

This commit is contained in:
mfvitale 2024-04-18 12:39:03 +02:00 committed by Fiore Mario Vitale
parent a708ef1009
commit 7ee8935694
12 changed files with 108 additions and 20 deletions

View File

@ -664,6 +664,7 @@ public interface SnapshotLockingStrategy {
PORT,
USER,
PASSWORD,
QUERY_TIMEOUT_MS,
ON_CONNECT_STATEMENTS,
SERVER_ID,
SERVER_ID_OFFSET,

View File

@ -106,6 +106,7 @@ public abstract class BinlogStreamingChangeEventSource<P extends BinlogPartition
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogStreamingChangeEventSource.class);
private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
private static final String SET_STATEMENT_REGEX = "SET STATEMENT .* FOR";
private final BinaryLogClient client;
private final BinlogStreamingChangeEventSourceMetrics<?, P> metrics;
@ -701,7 +702,7 @@ protected void handleQueryEvent(P partition, O offsetContext, Event event) throw
return;
}
String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
String upperCasedStatementBegin = Strings.getBegin(removeSetStatement(sql), 7).toUpperCase();
if (upperCasedStatementBegin.startsWith("XA ")) {
// This is an XA transaction, and we currently ignore these and do nothing ...
@ -750,6 +751,10 @@ protected void handleQueryEvent(P partition, O offsetContext, Event event) throw
}
}
private String removeSetStatement(String sql) {
return sql.replaceAll(SET_STATEMENT_REGEX, "").trim();
}
/**
* Handle a change in the table metadata.<p></p>
*

View File

@ -1749,7 +1749,7 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except
// Should have been an insert with query parsed.
validate(sourceRecord);
assertInsert(sourceRecord, "id", 110);
assertSourceQuery(sourceRecord, insertSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(insertSqlStatement));
}
/**
@ -1805,7 +1805,7 @@ public void parseMultipleInsertStatements() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertSourceQuery(sourceRecord1, insertSqlStatement1);
assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement1));
// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
@ -1813,7 +1813,7 @@ public void parseMultipleInsertStatements() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertSourceQuery(sourceRecord2, insertSqlStatement2);
assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement2));
}
/**
@ -1867,7 +1867,7 @@ public void parseMultipleRowInsertStatement() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertSourceQuery(sourceRecord1, insertSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement));
// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
@ -1875,7 +1875,7 @@ public void parseMultipleRowInsertStatement() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertSourceQuery(sourceRecord2, insertSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement));
}
/**
@ -1927,7 +1927,7 @@ public void parseDeleteQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord);
assertDelete(sourceRecord, "order_number", 10001);
assertSourceQuery(sourceRecord, deleteSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(deleteSqlStatement));
}
/**
@ -1979,7 +1979,7 @@ public void parseMultiRowDeleteQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertDelete(sourceRecord1, "order_number", 10002);
assertSourceQuery(sourceRecord1, deleteSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(deleteSqlStatement));
// Validate second event.
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
@ -1987,7 +1987,7 @@ public void parseMultiRowDeleteQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord2);
assertDelete(sourceRecord2, "order_number", 10004);
assertSourceQuery(sourceRecord2, deleteSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(deleteSqlStatement));
}
/**
@ -2039,7 +2039,7 @@ public void parseUpdateQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord);
assertUpdate(sourceRecord, "id", 109);
assertSourceQuery(sourceRecord, updateSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(updateSqlStatement));
}
/**
@ -2091,7 +2091,7 @@ public void parseMultiRowUpdateQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertUpdate(sourceRecord1, "order_number", 10001);
assertSourceQuery(sourceRecord1, updateSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(updateSqlStatement));
// Validate second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
@ -2099,7 +2099,7 @@ public void parseMultiRowUpdateQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord2);
assertUpdate(sourceRecord2, "order_number", 10004);
assertSourceQuery(sourceRecord2, updateSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(updateSqlStatement));
}
/**
@ -2108,7 +2108,7 @@ public void parseMultiRowUpdateQuery() throws Exception {
*/
@Test
@FixFor("DBZ-1234")
public void shouldFailToValidateAdaptivePrecisionMode() throws InterruptedException {
public void shouldFailToValidateAdaptivePrecisionMode() {
config = DATABASE.defaultConfig()
.with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
@ -2652,6 +2652,11 @@ public void shouldEmitTruncateOperation() throws Exception {
stopConnector();
}
protected String getExpectedQuery(String statement) {
return statement;
}
private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer<SourceRecord> {
protected BlockingQueue<SourceRecord> recordQueue;

View File

@ -19,6 +19,8 @@ public interface BinlogConnectorTest<C extends SourceConnector> {
BinlogTestConnection getTestDatabaseConnection(String databaseName);
BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout);
BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName);
boolean isMariaDb();

View File

@ -5,11 +5,35 @@
*/
package io.debezium.connector.mariadb;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import org.junit.Test;
import io.debezium.connector.binlog.BinlogConnectionIT;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
/**
* @author Chris Cranford
*/
public class ConnectionIT extends BinlogConnectionIT<MariaDbConnector> implements MariaDbCommon {
@Test
public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException {
final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("readbinlog", "readbinlog_test");
DATABASE.createAndInitialize();
try (BinlogTestConnection conn = getTestDatabaseConnection(DATABASE.getDatabaseName(), 1000)) {
conn.connect();
assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)"))
.isInstanceOf(SQLTimeoutException.class)
.hasMessageContaining("Query execution was interrupted (max_statement_time exceeded)");
}
}
}

View File

@ -33,6 +33,11 @@ default BinlogTestConnection getTestDatabaseConnection(String databaseName) {
return MariaDbTestConnection.forTestDatabase(databaseName);
}
@Override
default BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout) {
return MariaDbTestConnection.forTestDatabase(databaseName, queryTimeout);
}
@Override
default BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName) {
return MariaDbTestConnection.forTestReplicaDatabase(databaseName);

View File

@ -68,4 +68,10 @@ protected MariaDbOffsetContext loadOffsets(Configuration configuration, Map<Stri
protected void assertBinlogPosition(long offsetPosition, long beforeInsertsPosition) {
assertThat(offsetPosition).isGreaterThanOrEqualTo(beforeInsertsPosition);
}
@Override
protected String getExpectedQuery(String statement) {
return "SET STATEMENT max_statement_time=600 FOR " + statement;
}
}

View File

@ -88,6 +88,21 @@ public static MariaDbTestConnection forTestDatabase(String databaseName) {
return new MariaDbTestConnection(getDefaultJdbcConfig(databaseName).build());
}
/**
* Obtain a connection instance to the named test database.
*
*
* @param databaseName the name of the test database
* @param queryTimeout the seconds to wait for query execution
* @return the connection instance; never null
*/
public static MariaDbTestConnection forTestDatabase(String databaseName, int queryTimeout) {
return new MariaDbTestConnection(getDefaultJdbcConfig(databaseName)
.withQueryTimeoutMs(queryTimeout)
.build());
}
/**
* Obtain a connection instance to the named test database.
*

View File

@ -208,8 +208,7 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
BinlogConnectorConfig.GTID_SOURCE_EXCLUDES)
.type(
JDBC_DRIVER,
JDBC_PROTOCOL,
QUERY_TIMEOUT_MS,)
JDBC_PROTOCOL)
.connector(SNAPSHOT_LOCKING_MODE)
.events(
GTID_SOURCE_INCLUDES,

View File

@ -5,8 +5,32 @@
*/
package io.debezium.connector.mysql;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import java.sql.SQLException;
import org.junit.Test;
import com.mysql.cj.jdbc.exceptions.MySQLTimeoutException;
import io.debezium.connector.binlog.BinlogConnectionIT;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
public class ConnectionIT extends BinlogConnectionIT<MySqlConnector> implements MySqlCommon {
@Test
public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException {
final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("readbinlog", "readbinlog_test");
DATABASE.createAndInitialize();
try (BinlogTestConnection conn = getTestDatabaseConnection(DATABASE.getDatabaseName(), 1000)) {
conn.connect();
assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)"))
.isInstanceOf(MySQLTimeoutException.class)
.hasMessage("Statement cancelled due to timeout or client request");
}
}
}

View File

@ -32,6 +32,11 @@ default BinlogTestConnection getTestDatabaseConnection(String databaseName) {
return MySqlTestConnection.forTestDatabase(databaseName);
}
@Override
default BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout) {
return MySqlTestConnection.forTestDatabase(databaseName, queryTimeout);
}
@Override
default BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName) {
return MySqlTestConnection.forTestReplicaDatabase(databaseName);

View File

@ -54,16 +54,13 @@ public static MySqlTestConnection forTestReplicaDatabase(String databaseName) {
*
*
* @param databaseName the name of the test database
* @param queryTimeout
* @param queryTimeout the seconds to wait for query execution
* @return the MySQLConnection instance; never null
*/
public static MySqlTestConnection forTestDatabase(String databaseName, int queryTimeout) {
return new MySqlTestConnection(JdbcConfiguration.copy(
Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX)))
.withDatabase(databaseName)
return new MySqlTestConnection(getDefaultJdbcConfig(databaseName)
.withQueryTimeoutMs(queryTimeout)
.with("characterEncoding", "utf8")
.build());
}