DBZ-1632 Exposing connection timeout as Duration in one single method
This commit is contained in:
parent
b817f2fa38
commit
ded3174d55
@ -28,6 +28,7 @@ Chris Cranford
|
||||
Chris Riccomini
|
||||
Christian Posta
|
||||
Cliff Wheadon
|
||||
Collin Van Dyck
|
||||
Cyril Scetbon
|
||||
David Chen
|
||||
David Feinblum
|
||||
|
@ -379,18 +379,18 @@ protected void doStart() {
|
||||
|
||||
// Start the log reader, which starts background threads ...
|
||||
if (isRunning()) {
|
||||
long timeoutInMilliseconds = context.timeoutInMilliseconds();
|
||||
long timeout = context.getConnectorConfig().getConnectionTimeout().toMillis();
|
||||
long started = context.getClock().currentTimeInMillis();
|
||||
try {
|
||||
logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeoutInMilliseconds);
|
||||
client.connect(context.timeoutInMilliseconds());
|
||||
logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
|
||||
client.connect(timeout);
|
||||
}
|
||||
catch (TimeoutException e) {
|
||||
// If the client thread is interrupted *before* the client could connect, the client throws a timeout exception
|
||||
// The only way we can distinguish this is if we get the timeout exception before the specified timeout has
|
||||
// elapsed, so we simply check this (within 10%) ...
|
||||
long duration = context.getClock().currentTimeInMillis() - started;
|
||||
if (duration > (0.9 * context.timeoutInMilliseconds())) {
|
||||
if (duration > (0.9 * timeout)) {
|
||||
double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration);
|
||||
throw new ConnectException("Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " +
|
||||
connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e);
|
||||
|
@ -94,7 +94,7 @@ public Config validate(Map<String, String> connectorConfigs) {
|
||||
&& portValue.errorMessages().isEmpty()
|
||||
&& userValue.errorMessages().isEmpty()) {
|
||||
// Try to connect to the database ...
|
||||
try (MySqlJdbcContext jdbcContext = new MySqlJdbcContext(config)) {
|
||||
try (MySqlJdbcContext jdbcContext = new MySqlJdbcContext(new MySqlConnectorConfig(config))) {
|
||||
jdbcContext.start();
|
||||
JdbcConnection mysql = jdbcContext.jdbc();
|
||||
try {
|
||||
|
@ -1050,6 +1050,7 @@ public static final Field MASK_COLUMN(int length) {
|
||||
private final GtidNewChannelPosition gitIdNewChannelPosition;
|
||||
private final SnapshotNewTables snapshotNewTables;
|
||||
private final TemporalPrecisionMode temporalPrecisionMode;
|
||||
private final Duration connectionTimeout;
|
||||
|
||||
public MySqlConnectorConfig(Configuration config) {
|
||||
super(
|
||||
@ -1067,6 +1068,8 @@ public MySqlConnectorConfig(Configuration config) {
|
||||
|
||||
String snapshotNewTables = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
|
||||
this.snapshotNewTables = SnapshotNewTables.parse(snapshotNewTables, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString());
|
||||
|
||||
this.connectionTimeout = Duration.ofMillis(config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS));
|
||||
}
|
||||
|
||||
public SnapshotLockingMode getSnapshotLockingMode() {
|
||||
@ -1206,4 +1209,8 @@ public String getContextName() {
|
||||
public TemporalPrecisionMode getTemporalPrecisionMode() {
|
||||
return temporalPrecisionMode;
|
||||
}
|
||||
|
||||
public Duration getConnectionTimeout() {
|
||||
return connectionTimeout;
|
||||
}
|
||||
}
|
||||
|
@ -49,20 +49,20 @@ public class MySqlJdbcContext implements AutoCloseable {
|
||||
protected final JdbcConnection jdbc;
|
||||
private final Map<String, String> originalSystemProperties = new HashMap<>();
|
||||
|
||||
public MySqlJdbcContext(Configuration config) {
|
||||
this.config = config; // must be set before most methods are used
|
||||
public MySqlJdbcContext(MySqlConnectorConfig config) {
|
||||
this.config = config.getConfig(); // must be set before most methods are used
|
||||
|
||||
// Set up the JDBC connection without actually connecting, with extra MySQL-specific properties
|
||||
// to give us better JDBC database metadata behavior, including using UTF-8 for the client-side character encoding
|
||||
// per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html
|
||||
boolean useSSL = sslModeEnabled();
|
||||
Configuration jdbcConfig = config
|
||||
Configuration jdbcConfig = this.config
|
||||
.filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(MySqlConnectorConfig.DATABASE_HISTORY.name())))
|
||||
.subset("database.", true);
|
||||
|
||||
Builder jdbcConfigBuilder = jdbcConfig
|
||||
.edit()
|
||||
.with("connectTimeout", Integer.toString(connectionTimeoutMs()))
|
||||
.with("connectTimeout", Long.toString(config.getConnectionTimeout().toMillis()))
|
||||
.with("useSSL", Boolean.toString(useSSL));
|
||||
|
||||
final String legacyDateTime = jdbcConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME);
|
||||
@ -107,8 +107,6 @@ public int port() {
|
||||
return config.getInteger(MySqlConnectorConfig.PORT);
|
||||
}
|
||||
|
||||
public int connectionTimeoutMs() { return config.getInteger(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS); }
|
||||
|
||||
public SecureConnectionMode sslMode() {
|
||||
String mode = config.getString(MySqlConnectorConfig.SSL_MODE);
|
||||
return SecureConnectionMode.parse(mode);
|
||||
|
@ -61,7 +61,7 @@ public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCa
|
||||
|
||||
this.config = config;
|
||||
this.connectorConfig = new MySqlConnectorConfig(config);
|
||||
this.connectionContext = new MySqlJdbcContext(config);
|
||||
this.connectionContext = new MySqlJdbcContext(connectorConfig);
|
||||
|
||||
// Set up the topic selector ...
|
||||
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), connectorConfig.getHeartbeatTopicsPrefix());
|
||||
@ -199,10 +199,6 @@ public long serverId() {
|
||||
return config.getLong(MySqlConnectorConfig.SERVER_ID);
|
||||
}
|
||||
|
||||
public long timeoutInMilliseconds() {
|
||||
return config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
public long rowCountForLargeTable() {
|
||||
return config.getLong(MySqlConnectorConfig.ROW_COUNT_FOR_STREAMING_RESULT_SETS);
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ public class AbstractMySqlConnectorOutputTest extends ConnectorOutputTest {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private static GtidSet readAvailableGtidSet(Configuration config) {
|
||||
try (MySqlJdbcContext context = new MySqlJdbcContext(config)) {
|
||||
try (MySqlJdbcContext context = new MySqlJdbcContext(new MySqlConnectorConfig(config))) {
|
||||
String availableServerGtidStr = context.knownGtidSet();
|
||||
if (availableServerGtidStr != null && !availableServerGtidStr.trim().isEmpty()) {
|
||||
return new GtidSet(availableServerGtidStr);
|
||||
@ -130,7 +130,7 @@ protected static void waitForGtidSetsToMatch(Configuration master, Configuration
|
||||
*/
|
||||
protected Map<String, String> readSystemVariables(Configuration config) throws Exception {
|
||||
Map<String, String> variables = new HashMap<>();
|
||||
try (MySqlJdbcContext context = new MySqlJdbcContext(config)) {
|
||||
try (MySqlJdbcContext context = new MySqlJdbcContext(new MySqlConnectorConfig(config))) {
|
||||
// Read all of the system variables ...
|
||||
variables.putAll(context.readMySqlSystemVariables());
|
||||
// Now get the master GTID source ...
|
||||
|
Loading…
Reference in New Issue
Block a user