Merge pull request #79 from hchiorean/DBZ-92
DBZ-92 Adds more logging information during MySQL snapshot recreation
This commit is contained in:
commit
c159ca88cb
@ -6,10 +6,8 @@
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.jdbc.JdbcConnection.ConnectionFactory;
|
||||
@ -21,7 +19,8 @@
|
||||
*/
|
||||
public class MySqlJdbcContext implements AutoCloseable {
|
||||
|
||||
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory("jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false");
|
||||
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false";
|
||||
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL);
|
||||
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
protected final Configuration config;
|
||||
@ -83,4 +82,8 @@ public void shutdown() {
|
||||
public void close() {
|
||||
shutdown();
|
||||
}
|
||||
|
||||
protected String connectionString() {
|
||||
return jdbc.connectionString(MYSQL_CONNECTION_URL);
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -131,7 +132,6 @@ protected void doCleanup() {
|
||||
*/
|
||||
protected void execute() {
|
||||
context.configureLoggingContext("snapshot");
|
||||
logger.info("Starting snapshot");
|
||||
final AtomicReference<String> sql = new AtomicReference<>();
|
||||
final JdbcConnection mysql = context.jdbc();
|
||||
final MySqlSchema schema = context.dbSchema();
|
||||
@ -139,6 +139,9 @@ protected void execute() {
|
||||
final SourceInfo source = context.source();
|
||||
final Clock clock = context.clock();
|
||||
final long ts = clock.currentTimeInMillis();
|
||||
logger.info("Starting snapshot for {} with user '{}'", context.connectionString(), mysql.username());
|
||||
logRolesForCurrentUser(sql, mysql);
|
||||
logServerInformation(sql, mysql);
|
||||
try {
|
||||
// ------
|
||||
// STEP 0
|
||||
@ -194,6 +197,10 @@ protected void execute() {
|
||||
// This column exists only in MySQL 5.6.5 or later ...
|
||||
String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
|
||||
source.setGtidSet(gtidSet);
|
||||
logger.debug("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
|
||||
gtidSet);
|
||||
} else {
|
||||
logger.debug("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
|
||||
}
|
||||
source.startSnapshot();
|
||||
}
|
||||
@ -214,7 +221,8 @@ protected void execute() {
|
||||
databaseNames.add(rs.getString(1));
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug("\t list of available databases is: {}", databaseNames);
|
||||
|
||||
// ------
|
||||
// STEP 5
|
||||
// ------
|
||||
@ -232,6 +240,9 @@ protected void execute() {
|
||||
if (filters.tableFilter().test(id)) {
|
||||
tableIds.add(id);
|
||||
tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
|
||||
logger.debug("\t including '{}'", id);
|
||||
} else {
|
||||
logger.debug("\t '{}' is filtered out, discarding", id);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -388,6 +399,34 @@ protected void execute() {
|
||||
failed(e, "Aborting snapshot after running '" + sql.get() + "': " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void logServerInformation(AtomicReference<String> sql, JdbcConnection mysql) {
|
||||
try {
|
||||
sql.set("SHOW VARIABLES LIKE 'version'");
|
||||
mysql.query(sql.get(), rs -> {
|
||||
if (rs.next()) {
|
||||
logger.info("MySql server version is '{}'", rs.getString(2));
|
||||
}
|
||||
});
|
||||
} catch (SQLException e) {
|
||||
logger.info("Cannot determine MySql server version", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void logRolesForCurrentUser(AtomicReference<String> sql, JdbcConnection mysql) {
|
||||
try {
|
||||
List<String> privileges = new ArrayList<>();
|
||||
sql.set("SHOW GRANTS");
|
||||
mysql.query(sql.get(), rs -> {
|
||||
while (rs.next()) {
|
||||
privileges.add(rs.getString(1));
|
||||
}
|
||||
});
|
||||
logger.info("User '{}' has '{}'", mysql.username(), privileges);
|
||||
} catch (SQLException e) {
|
||||
logger.info("Cannot determine the privileges for '{}' ", mysql.username(), e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void enqueueSchemaChanges(String dbName, String ddlStatements) {
|
||||
if (context.includeSchemaChangeRecords() &&
|
||||
|
@ -48,7 +48,7 @@ public static MySQLConnection forTestDatabase(String databaseName, String userna
|
||||
protected static void addDefaults(Configuration.Builder builder) {
|
||||
builder.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
|
||||
.withDefault(JdbcConfiguration.PORT, 3306)
|
||||
.withDefault(JdbcConfiguration.USER, "mysql")
|
||||
.withDefault(JdbcConfiguration.USER, "mysqluser")
|
||||
.withDefault(JdbcConfiguration.PASSWORD, "mysqlpw");
|
||||
}
|
||||
|
||||
|
@ -27,10 +27,8 @@
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Field;
|
||||
@ -228,7 +226,12 @@ public JdbcConnection connect() throws SQLException {
|
||||
public JdbcConnection execute(String... sqlStatements) throws SQLException {
|
||||
return execute(statement -> {
|
||||
for (String sqlStatement : sqlStatements) {
|
||||
if (sqlStatement != null) statement.execute(sqlStatement);
|
||||
if (sqlStatement != null) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("executing '{}'", sqlStatement);
|
||||
}
|
||||
statement.execute(sqlStatement);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -274,6 +277,9 @@ public static interface StatementPreparer {
|
||||
public JdbcConnection query(String query, ResultSetConsumer resultConsumer) throws SQLException {
|
||||
Connection conn = connection();
|
||||
try (Statement statement = conn.createStatement();) {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace("running '{}'", query);
|
||||
}
|
||||
try (ResultSet resultSet = statement.executeQuery(query);) {
|
||||
if (resultConsumer != null) {
|
||||
resultConsumer.accept(resultSet);
|
||||
@ -514,6 +520,27 @@ public Set<TableId> readTableNames(String databaseCatalog, String schemaNamePatt
|
||||
return tableIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a JDBC connection string using the current configuration and url.
|
||||
*
|
||||
* @param urlPattern a {@code String} representing a JDBC connection with variables that will be replaced
|
||||
* @return a {@code String} where the variables in {@code urlPattern} are replaced with values from the configuration
|
||||
*/
|
||||
public String connectionString(String urlPattern) {
|
||||
Properties props = config.asProperties();
|
||||
return findAndReplace(urlPattern, props, JdbcConfiguration.DATABASE, JdbcConfiguration.HOSTNAME, JdbcConfiguration.PORT,
|
||||
JdbcConfiguration.USER, JdbcConfiguration.PASSWORD);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the username for this connection
|
||||
*
|
||||
* @return a {@code String}, never {@code null}
|
||||
*/
|
||||
public String username() {
|
||||
return config.getString(JdbcConfiguration.USER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create definitions for each tables in the database, given the catalog name, schema pattern, table filter, and
|
||||
* column filter.
|
||||
|
Loading…
Reference in New Issue
Block a user