diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index ac0d5c6b0..36c81b330 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -28,6 +28,7 @@ Chris Cranford Chris Riccomini Christian Posta Cliff Wheadon +Collin Van Dyck Cyril Scetbon David Chen David Feinblum diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index 1d3d0268d..92b816d46 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -379,18 +379,18 @@ protected void doStart() { // Start the log reader, which starts background threads ... if (isRunning()) { - long timeoutInMilliseconds = context.timeoutInMilliseconds(); + long timeout = context.getConnectorConfig().getConnectionTimeout().toMillis(); long started = context.getClock().currentTimeInMillis(); try { - logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeoutInMilliseconds); - client.connect(context.timeoutInMilliseconds()); + logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout); + client.connect(timeout); } catch (TimeoutException e) { // If the client thread is interrupted *before* the client could connect, the client throws a timeout exception // The only way we can distinguish this is if we get the timeout exception before the specified timeout has // elapsed, so we simply check this (within 10%) ... long duration = context.getClock().currentTimeInMillis() - started; - if (duration > (0.9 * context.timeoutInMilliseconds())) { + if (duration > (0.9 * timeout)) { double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration); throw new ConnectException("Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " + connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnector.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnector.java index 09090d7f6..b1af32b07 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnector.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnector.java @@ -94,7 +94,7 @@ public Config validate(Map connectorConfigs) { && portValue.errorMessages().isEmpty() && userValue.errorMessages().isEmpty()) { // Try to connect to the database ... - try (MySqlJdbcContext jdbcContext = new MySqlJdbcContext(config)) { + try (MySqlJdbcContext jdbcContext = new MySqlJdbcContext(new MySqlConnectorConfig(config))) { jdbcContext.start(); JdbcConnection mysql = jdbcContext.jdbc(); try { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index fa62b1f81..b40d15706 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -1050,6 +1050,7 @@ public static final Field MASK_COLUMN(int length) { private final GtidNewChannelPosition gitIdNewChannelPosition; private final SnapshotNewTables snapshotNewTables; private final TemporalPrecisionMode temporalPrecisionMode; + private final Duration connectionTimeout; public MySqlConnectorConfig(Configuration config) { super( @@ -1067,6 +1068,8 @@ public MySqlConnectorConfig(Configuration config) { String snapshotNewTables = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES); this.snapshotNewTables = SnapshotNewTables.parse(snapshotNewTables, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString()); + + this.connectionTimeout = Duration.ofMillis(config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS)); } public SnapshotLockingMode getSnapshotLockingMode() { @@ -1206,4 +1209,8 @@ public String getContextName() { public TemporalPrecisionMode getTemporalPrecisionMode() { return temporalPrecisionMode; } + + public Duration getConnectionTimeout() { + return connectionTimeout; + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java index 0aebeabd5..e7275d2b5 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java @@ -49,20 +49,20 @@ public class MySqlJdbcContext implements AutoCloseable { protected final JdbcConnection jdbc; private final Map originalSystemProperties = new HashMap<>(); - public MySqlJdbcContext(Configuration config) { - this.config = config; // must be set before most methods are used + public MySqlJdbcContext(MySqlConnectorConfig config) { + this.config = config.getConfig(); // must be set before most methods are used // Set up the JDBC connection without actually connecting, with extra MySQL-specific properties // to give us better JDBC database metadata behavior, including using UTF-8 for the client-side character encoding // per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html boolean useSSL = sslModeEnabled(); - Configuration jdbcConfig = config + Configuration jdbcConfig = this.config .filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(MySqlConnectorConfig.DATABASE_HISTORY.name()))) .subset("database.", true); Builder jdbcConfigBuilder = jdbcConfig .edit() - .with("connectTimeout", Integer.toString(connectionTimeoutMs())) + .with("connectTimeout", Long.toString(config.getConnectionTimeout().toMillis())) .with("useSSL", Boolean.toString(useSSL)); final String legacyDateTime = jdbcConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME); @@ -107,8 +107,6 @@ public int port() { return config.getInteger(MySqlConnectorConfig.PORT); } - public int connectionTimeoutMs() { return config.getInteger(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS); } - public SecureConnectionMode sslMode() { String mode = config.getString(MySqlConnectorConfig.SSL_MODE); return SecureConnectionMode.parse(mode); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index a7d13187a..5537d2611 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -61,7 +61,7 @@ public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCa this.config = config; this.connectorConfig = new MySqlConnectorConfig(config); - this.connectionContext = new MySqlJdbcContext(config); + this.connectionContext = new MySqlJdbcContext(connectorConfig); // Set up the topic selector ... this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), connectorConfig.getHeartbeatTopicsPrefix()); @@ -199,10 +199,6 @@ public long serverId() { return config.getLong(MySqlConnectorConfig.SERVER_ID); } - public long timeoutInMilliseconds() { - return config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS); - } - public long rowCountForLargeTable() { return config.getLong(MySqlConnectorConfig.ROW_COUNT_FOR_STREAMING_RESULT_SETS); } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/AbstractMySqlConnectorOutputTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/AbstractMySqlConnectorOutputTest.java index bccae688a..c8462a6b0 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/AbstractMySqlConnectorOutputTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/AbstractMySqlConnectorOutputTest.java @@ -36,7 +36,7 @@ public class AbstractMySqlConnectorOutputTest extends ConnectorOutputTest { private final Logger logger = LoggerFactory.getLogger(getClass()); private static GtidSet readAvailableGtidSet(Configuration config) { - try (MySqlJdbcContext context = new MySqlJdbcContext(config)) { + try (MySqlJdbcContext context = new MySqlJdbcContext(new MySqlConnectorConfig(config))) { String availableServerGtidStr = context.knownGtidSet(); if (availableServerGtidStr != null && !availableServerGtidStr.trim().isEmpty()) { return new GtidSet(availableServerGtidStr); @@ -130,7 +130,7 @@ protected static void waitForGtidSetsToMatch(Configuration master, Configuration */ protected Map readSystemVariables(Configuration config) throws Exception { Map variables = new HashMap<>(); - try (MySqlJdbcContext context = new MySqlJdbcContext(config)) { + try (MySqlJdbcContext context = new MySqlJdbcContext(new MySqlConnectorConfig(config))) { // Read all of the system variables ... variables.putAll(context.readMySqlSystemVariables()); // Now get the master GTID source ...