From 008263ea008ce06889f8e671058659c8595419ad Mon Sep 17 00:00:00 2001 From: Horia Chiorean Date: Tue, 9 Aug 2016 11:35:23 +0300 Subject: [PATCH] DBZ-92, DBZ-97 Makes logging more verbose and changes the snapshot reader to produce separate events for each DDL change --- .../connector/mysql/SnapshotReader.java | 48 ++++++++----------- .../connector/mysql/MySqlConnectorIT.java | 6 +-- .../connector/mysql/SnapshotReaderIT.java | 4 +- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index 7fcc3b420..a87c60413 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -19,7 +19,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - import org.apache.kafka.connect.source.SourceRecord; import io.debezium.connector.mysql.RecordMakers.RecordsForTable; @@ -205,10 +204,10 @@ protected void execute() { // This column exists only in MySQL 5.6.5 or later ... String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set source.setGtidSet(gtidSet); - logger.debug("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, + logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, gtidSet); } else { - logger.debug("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); + logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); } source.startSnapshot(); } @@ -229,7 +228,7 @@ protected void execute() { databaseNames.add(rs.getString(1)); } }); - logger.debug("\t list of available databases is: {}", databaseNames); + logger.info("\t list of available databases is: {}", databaseNames); // ------ // STEP 5 @@ -248,9 +247,9 @@ protected void execute() { if (filters.tableFilter().test(id)) { tableIds.add(id); tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id); - logger.debug("\t including '{}'", id); + logger.info("\t including '{}'", id); } else { - logger.debug("\t '{}' is filtered out, discarding", id); + logger.info("\t '{}' is filtered out, discarding", id); } } }); @@ -261,38 +260,31 @@ protected void execute() { // ------ // Transform the current schema so that it reflects the *current* state of the MySQL server's contents. // First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ... - logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas"); - final List ddlStatements = new ArrayList<>(); + logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:"); // Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ... Set allTableIds = new HashSet<>(schema.tables().tableIds()); allTableIds.addAll(tableIds); - allTableIds.forEach(tableId -> { - ddlStatements.add("DROP TABLE IF EXISTS " + tableId); - }); + allTableIds.forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId, this::enqueueSchemaChanges)); // Add a DROP DATABASE statement for each database that we no longer know about ... schema.tables().tableIds().stream().map(TableId::catalog) .filter(Predicates.not(databaseNames::contains)) - .forEach(missingDbName -> { - ddlStatements.add("DROP DATABASE IF EXISTS " + missingDbName); - }); + .forEach(missingDbName -> schema.applyDdl(source, missingDbName, "DROP DATABASE IF EXISTS " + missingDbName, this::enqueueSchemaChanges)); // Now process all of our tables for each database ... for (Map.Entry> entry : tableIdsByDbName.entrySet()) { String dbName = entry.getKey(); // First drop, create, and then use the named database ... - ddlStatements.add("DROP DATABASE IF EXISTS " + dbName); - ddlStatements.add("CREATE DATABASE " + dbName); - ddlStatements.add("USE " + dbName); + schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + dbName, this::enqueueSchemaChanges); + schema.applyDdl(source, dbName, "CREATE DATABASE " + dbName, this::enqueueSchemaChanges); + schema.applyDdl(source, dbName, "USE " + dbName, this::enqueueSchemaChanges); for (TableId tableId : entry.getValue()) { sql.set("SHOW CREATE TABLE " + tableId); mysql.query(sql.get(), rs -> { - if (rs.next()) ddlStatements.add(rs.getString(2)); // CREATE TABLE statement + if (rs.next()) { + schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges); + } }); } } - // Finally, apply the DDL statements to the schema and then update the record maker... - logger.debug("Step 6b: applying DROP and CREATE statements to connector's table model"); - String ddlStatementsStr = String.join(";" + System.lineSeparator(), ddlStatements); - schema.applyDdl(source, null, ddlStatementsStr, this::enqueueSchemaChanges); context.makeRecord().regenerate(); // ------ @@ -348,7 +340,7 @@ protected void execute() { // Scan the rows in the table ... long start = clock.currentTimeInMillis(); - logger.debug("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size()); + logger.info("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size()); sql.set("SELECT * FROM " + tableId); mysql.query(sql.get(), statementFactory, rs -> { long rowNum = 0; @@ -531,10 +523,12 @@ protected SourceRecord replaceOffset(SourceRecord record) { record.value()); } - protected void enqueueSchemaChanges(String dbName, String ddlStatements) { - if (context.includeSchemaChangeRecords() && - context.makeRecord().schemaChanges(dbName, ddlStatements, super::enqueueRecord) > 0) { - logger.debug("Recorded DDL statements for database '{}': {}", dbName, ddlStatements); + protected void enqueueSchemaChanges(String dbName, String ddlStatement) { + if (!context.includeSchemaChangeRecords() || ddlStatement.length() == 0) { + return; + } + if (context.makeRecord().schemaChanges(dbName, ddlStatement, super::enqueueRecord) > 0) { + logger.info("\t{}", ddlStatement); } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index a857d66c8..a984f66a1 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -221,15 +221,15 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio // --------------------------------------------------------------------------------------------------------------- // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- - SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 1); // 1 schema change record - assertThat(records.recordsForTopic("myServer").size()).isEqualTo(1); + SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11); // 11 schema change records + assertThat(records.recordsForTopic("myServer").size()).isEqualTo(11); assertThat(records.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(9); assertThat(records.recordsForTopic("myServer.connector_test.products_on_hand").size()).isEqualTo(9); assertThat(records.recordsForTopic("myServer.connector_test.customers").size()).isEqualTo(4); assertThat(records.recordsForTopic("myServer.connector_test.orders").size()).isEqualTo(5); assertThat(records.topics().size()).isEqualTo(5); assertThat(records.databaseNames().size()).isEqualTo(1); - assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(1); + assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(11); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); records.ddlRecordsForDatabase("connector_test").forEach(this::print); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java index 93889b732..2e7638936 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java @@ -267,8 +267,8 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep // The last poll should always return null ... assertThat(records).isNull(); - // There should be 1 schema change ... - assertThat(schemaChanges.recordCount()).isEqualTo(1); + // There should be 11 schema changes ... + assertThat(schemaChanges.recordCount()).isEqualTo(11); assertThat(schemaChanges.databaseCount()).isEqualTo(1); assertThat(schemaChanges.databases()).containsOnly(DB_NAME);