From bb1b7d5734726582ac593514e7074a02886cacf3 Mon Sep 17 00:00:00 2001 From: Horia Chiorean Date: Wed, 3 Aug 2016 13:50:56 +0300 Subject: [PATCH] DBZ-92 Adds more logging information during MySQL snapshot recreation --- .../connector/mysql/MySqlJdbcContext.java | 9 ++-- .../connector/mysql/SnapshotReader.java | 43 ++++++++++++++++++- .../connector/mysql/MySQLConnection.java | 2 +- .../java/io/debezium/jdbc/JdbcConnection.java | 33 ++++++++++++-- 4 files changed, 78 insertions(+), 9 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java index d91ede78d..e8f0b7ad7 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java @@ -6,10 +6,8 @@ package io.debezium.connector.mysql; import java.sql.SQLException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import io.debezium.config.Configuration; import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection.ConnectionFactory; @@ -21,7 +19,8 @@ */ public class MySqlJdbcContext implements AutoCloseable { - protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory("jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false"); + protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false"; + protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL); protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Configuration config; @@ -83,4 +82,8 @@ public void shutdown() { public void close() { shutdown(); } + + protected String connectionString() { + return jdbc.connectionString(MYSQL_CONNECTION_URL); + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index d1fb78698..8834e41f1 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.mysql; +import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -131,7 +132,6 @@ protected void doCleanup() { */ protected void execute() { context.configureLoggingContext("snapshot"); - logger.info("Starting snapshot"); final AtomicReference sql = new AtomicReference<>(); final JdbcConnection mysql = context.jdbc(); final MySqlSchema schema = context.dbSchema(); @@ -139,6 +139,9 @@ protected void execute() { final SourceInfo source = context.source(); final Clock clock = context.clock(); final long ts = clock.currentTimeInMillis(); + logger.info("Starting snapshot for {} with user '{}'", context.connectionString(), mysql.username()); + logRolesForCurrentUser(sql, mysql); + logServerInformation(sql, mysql); try { // ------ // STEP 0 @@ -194,6 +197,10 @@ protected void execute() { // This column exists only in MySQL 5.6.5 or later ... String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set source.setGtidSet(gtidSet); + logger.debug("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, + gtidSet); + } else { + logger.debug("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); } source.startSnapshot(); } @@ -214,7 +221,8 @@ protected void execute() { databaseNames.add(rs.getString(1)); } }); - + logger.debug("\t list of available databases is: {}", databaseNames); + // ------ // STEP 5 // ------ @@ -232,6 +240,9 @@ protected void execute() { if (filters.tableFilter().test(id)) { tableIds.add(id); tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id); + logger.debug("\t including '{}'", id); + } else { + logger.debug("\t '{}' is filtered out, discarding", id); } } }); @@ -388,6 +399,34 @@ protected void execute() { failed(e, "Aborting snapshot after running '" + sql.get() + "': " + e.getMessage()); } } + + private void logServerInformation(AtomicReference sql, JdbcConnection mysql) { + try { + sql.set("SHOW VARIABLES LIKE 'version'"); + mysql.query(sql.get(), rs -> { + if (rs.next()) { + logger.info("MySql server version is '{}'", rs.getString(2)); + } + }); + } catch (SQLException e) { + logger.info("Cannot determine MySql server version", e); + } + } + + private void logRolesForCurrentUser(AtomicReference sql, JdbcConnection mysql) { + try { + List privileges = new ArrayList<>(); + sql.set("SHOW GRANTS"); + mysql.query(sql.get(), rs -> { + while (rs.next()) { + privileges.add(rs.getString(1)); + } + }); + logger.info("User '{}' has '{}'", mysql.username(), privileges); + } catch (SQLException e) { + logger.info("Cannot determine the privileges for '{}' ", mysql.username(), e); + } + } protected void enqueueSchemaChanges(String dbName, String ddlStatements) { if (context.includeSchemaChangeRecords() && diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySQLConnection.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySQLConnection.java index 6a40c19ee..eeed6ed31 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySQLConnection.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySQLConnection.java @@ -48,7 +48,7 @@ public static MySQLConnection forTestDatabase(String databaseName, String userna protected static void addDefaults(Configuration.Builder builder) { builder.withDefault(JdbcConfiguration.HOSTNAME, "localhost") .withDefault(JdbcConfiguration.PORT, 3306) - .withDefault(JdbcConfiguration.USER, "mysql") + .withDefault(JdbcConfiguration.USER, "mysqluser") .withDefault(JdbcConfiguration.PASSWORD, "mysqlpw"); } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 2798b3417..caadc2ecc 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -27,10 +27,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import java.util.stream.Stream; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import io.debezium.annotation.ThreadSafe; import io.debezium.config.Configuration; import io.debezium.config.Field; @@ -228,7 +226,12 @@ public JdbcConnection connect() throws SQLException { public JdbcConnection execute(String... sqlStatements) throws SQLException { return execute(statement -> { for (String sqlStatement : sqlStatements) { - if (sqlStatement != null) statement.execute(sqlStatement); + if (sqlStatement != null) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("executing '{}'", sqlStatement); + } + statement.execute(sqlStatement); + } } }); } @@ -274,6 +277,9 @@ public static interface StatementPreparer { public JdbcConnection query(String query, ResultSetConsumer resultConsumer) throws SQLException { Connection conn = connection(); try (Statement statement = conn.createStatement();) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("running '{}'", query); + } try (ResultSet resultSet = statement.executeQuery(query);) { if (resultConsumer != null) { resultConsumer.accept(resultSet); @@ -514,6 +520,27 @@ public Set readTableNames(String databaseCatalog, String schemaNamePatt return tableIds; } + /** + * Returns a JDBC connection string using the current configuration and url. + * + * @param urlPattern a {@code String} representing a JDBC connection with variables that will be replaced + * @return a {@code String} where the variables in {@code urlPattern} are replaced with values from the configuration + */ + public String connectionString(String urlPattern) { + Properties props = config.asProperties(); + return findAndReplace(urlPattern, props, JdbcConfiguration.DATABASE, JdbcConfiguration.HOSTNAME, JdbcConfiguration.PORT, + JdbcConfiguration.USER, JdbcConfiguration.PASSWORD); + } + + /** + * Returns the username for this connection + * + * @return a {@code String}, never {@code null} + */ + public String username() { + return config.getString(JdbcConfiguration.USER); + } + /** * Create definitions for each tables in the database, given the catalog name, schema pattern, table filter, and * column filter.