DBZ-92, DBZ-97 Makes logging more verbose and changes the snapshot reader to produce separate events for each DDL change

This commit is contained in:
Horia Chiorean 2016-08-09 11:35:23 +03:00
parent 1e2cadf30e
commit 008263ea00
3 changed files with 26 additions and 32 deletions

View File

@ -19,7 +19,6 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.connector.mysql.RecordMakers.RecordsForTable; import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
@ -205,10 +204,10 @@ protected void execute() {
// This column exists only in MySQL 5.6.5 or later ... // This column exists only in MySQL 5.6.5 or later ...
String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
source.setGtidSet(gtidSet); source.setGtidSet(gtidSet);
logger.debug("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
gtidSet); gtidSet);
} else { } else {
logger.debug("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
} }
source.startSnapshot(); source.startSnapshot();
} }
@ -229,7 +228,7 @@ protected void execute() {
databaseNames.add(rs.getString(1)); databaseNames.add(rs.getString(1));
} }
}); });
logger.debug("\t list of available databases is: {}", databaseNames); logger.info("\t list of available databases is: {}", databaseNames);
// ------ // ------
// STEP 5 // STEP 5
@ -248,9 +247,9 @@ protected void execute() {
if (filters.tableFilter().test(id)) { if (filters.tableFilter().test(id)) {
tableIds.add(id); tableIds.add(id);
tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id); tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
logger.debug("\t including '{}'", id); logger.info("\t including '{}'", id);
} else { } else {
logger.debug("\t '{}' is filtered out, discarding", id); logger.info("\t '{}' is filtered out, discarding", id);
} }
} }
}); });
@ -261,38 +260,31 @@ protected void execute() {
// ------ // ------
// Transform the current schema so that it reflects the *current* state of the MySQL server's contents. // Transform the current schema so that it reflects the *current* state of the MySQL server's contents.
// First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ... // First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ...
logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas"); logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:");
final List<String> ddlStatements = new ArrayList<>();
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ... // Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds()); Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
allTableIds.addAll(tableIds); allTableIds.addAll(tableIds);
allTableIds.forEach(tableId -> { allTableIds.forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId, this::enqueueSchemaChanges));
ddlStatements.add("DROP TABLE IF EXISTS " + tableId);
});
// Add a DROP DATABASE statement for each database that we no longer know about ... // Add a DROP DATABASE statement for each database that we no longer know about ...
schema.tables().tableIds().stream().map(TableId::catalog) schema.tables().tableIds().stream().map(TableId::catalog)
.filter(Predicates.not(databaseNames::contains)) .filter(Predicates.not(databaseNames::contains))
.forEach(missingDbName -> { .forEach(missingDbName -> schema.applyDdl(source, missingDbName, "DROP DATABASE IF EXISTS " + missingDbName, this::enqueueSchemaChanges));
ddlStatements.add("DROP DATABASE IF EXISTS " + missingDbName);
});
// Now process all of our tables for each database ... // Now process all of our tables for each database ...
for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) { for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
String dbName = entry.getKey(); String dbName = entry.getKey();
// First drop, create, and then use the named database ... // First drop, create, and then use the named database ...
ddlStatements.add("DROP DATABASE IF EXISTS " + dbName); schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + dbName, this::enqueueSchemaChanges);
ddlStatements.add("CREATE DATABASE " + dbName); schema.applyDdl(source, dbName, "CREATE DATABASE " + dbName, this::enqueueSchemaChanges);
ddlStatements.add("USE " + dbName); schema.applyDdl(source, dbName, "USE " + dbName, this::enqueueSchemaChanges);
for (TableId tableId : entry.getValue()) { for (TableId tableId : entry.getValue()) {
sql.set("SHOW CREATE TABLE " + tableId); sql.set("SHOW CREATE TABLE " + tableId);
mysql.query(sql.get(), rs -> { mysql.query(sql.get(), rs -> {
if (rs.next()) ddlStatements.add(rs.getString(2)); // CREATE TABLE statement if (rs.next()) {
schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
}
}); });
} }
} }
// Finally, apply the DDL statements to the schema and then update the record maker...
logger.debug("Step 6b: applying DROP and CREATE statements to connector's table model");
String ddlStatementsStr = String.join(";" + System.lineSeparator(), ddlStatements);
schema.applyDdl(source, null, ddlStatementsStr, this::enqueueSchemaChanges);
context.makeRecord().regenerate(); context.makeRecord().regenerate();
// ------ // ------
@ -348,7 +340,7 @@ protected void execute() {
// Scan the rows in the table ... // Scan the rows in the table ...
long start = clock.currentTimeInMillis(); long start = clock.currentTimeInMillis();
logger.debug("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size()); logger.info("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size());
sql.set("SELECT * FROM " + tableId); sql.set("SELECT * FROM " + tableId);
mysql.query(sql.get(), statementFactory, rs -> { mysql.query(sql.get(), statementFactory, rs -> {
long rowNum = 0; long rowNum = 0;
@ -531,10 +523,12 @@ protected SourceRecord replaceOffset(SourceRecord record) {
record.value()); record.value());
} }
protected void enqueueSchemaChanges(String dbName, String ddlStatements) { protected void enqueueSchemaChanges(String dbName, String ddlStatement) {
if (context.includeSchemaChangeRecords() && if (!context.includeSchemaChangeRecords() || ddlStatement.length() == 0) {
context.makeRecord().schemaChanges(dbName, ddlStatements, super::enqueueRecord) > 0) { return;
logger.debug("Recorded DDL statements for database '{}': {}", dbName, ddlStatements); }
if (context.makeRecord().schemaChanges(dbName, ddlStatement, super::enqueueRecord) > 0) {
logger.info("\t{}", ddlStatement);
} }
} }

View File

@ -221,15 +221,15 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// --------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database // Consume all of the events due to startup and initialization of the database
// --------------------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 1); // 1 schema change record SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11); // 11 schema change records
assertThat(records.recordsForTopic("myServer").size()).isEqualTo(1); assertThat(records.recordsForTopic("myServer").size()).isEqualTo(11);
assertThat(records.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(9); assertThat(records.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(9);
assertThat(records.recordsForTopic("myServer.connector_test.products_on_hand").size()).isEqualTo(9); assertThat(records.recordsForTopic("myServer.connector_test.products_on_hand").size()).isEqualTo(9);
assertThat(records.recordsForTopic("myServer.connector_test.customers").size()).isEqualTo(4); assertThat(records.recordsForTopic("myServer.connector_test.customers").size()).isEqualTo(4);
assertThat(records.recordsForTopic("myServer.connector_test.orders").size()).isEqualTo(5); assertThat(records.recordsForTopic("myServer.connector_test.orders").size()).isEqualTo(5);
assertThat(records.topics().size()).isEqualTo(5); assertThat(records.topics().size()).isEqualTo(5);
assertThat(records.databaseNames().size()).isEqualTo(1); assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(1); assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(11);
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("connector_test").forEach(this::print); records.ddlRecordsForDatabase("connector_test").forEach(this::print);

View File

@ -267,8 +267,8 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
// The last poll should always return null ... // The last poll should always return null ...
assertThat(records).isNull(); assertThat(records).isNull();
// There should be 1 schema change ... // There should be 11 schema changes ...
assertThat(schemaChanges.recordCount()).isEqualTo(1); assertThat(schemaChanges.recordCount()).isEqualTo(11);
assertThat(schemaChanges.databaseCount()).isEqualTo(1); assertThat(schemaChanges.databaseCount()).isEqualTo(1);
assertThat(schemaChanges.databases()).containsOnly(DB_NAME); assertThat(schemaChanges.databases()).containsOnly(DB_NAME);