DBZ-55 Corrected filtering of DDL statements based upon affected database

Previously, the DDL statements were being filtered and recorded based upon the name of the database that appeared in the binlog. However, that database name is actually the name of the database to which the client submitting the operation is connected, and is not necessarily the database _affected_ by the operation (e.g., when an operation includes a fully-qualified table name not in the connected-to database).

With these changes, the table/database affected by the DDL statements is now being used to filter the recording of the statements. The order of the DDL statements is still maintained, but since each DDL statement can apply to a separate database the DDL statements are batched (in the same original order) based upon the affected database. For example, two statements affecting "db1" will get batched together into one schema change record, followed by one statement affecting "db2" as a second schema change record, followed by another statement affecting "db1" as a third schema record.

Meanwhile, this change does not affect how the database history records the changes: it still records them as submitted using a single record for each separate binlog event/position. This is much safer as each binlog event (with specific position) is written atomically to the history stream. Also, since the database history stream is what the connector uses upon recovery, the database history records are now written _after_ any schema change records to ensure that, upon recovery after failure, no schema change records are lost (and instead have at-least-once delivery guarantees).
This commit is contained in:
Randall Hauch 2016-05-23 11:01:14 -05:00
parent 4840650c41
commit dc5a379764
11 changed files with 573 additions and 35 deletions

View File

@ -148,7 +148,7 @@ protected void parseCreate(Marker marker) {
} else if (tokens.matches("VIEW")) {
parseCreateView(marker);
} else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
parseCreateUnknown(marker);
parseCreateDatabase(marker);
} else if (tokens.matchesAnyOf("EVENT")) {
parseCreateUnknown(marker);
} else if (tokens.matchesAnyOf("FUNCTION", "PROCEDURE")) {
@ -167,6 +167,31 @@ protected void parseCreate(Marker marker) {
this::parseCreateUnknown);
}
}
protected void parseCreateDatabase(Marker start) {
tokens.consumeAnyOf("DATABASE","SCHEMA");
tokens.canConsume("IF","NOT","EXISTS");
String dbName = tokens.consume();
consumeRemainingStatement(start);
signalCreateDatabase(dbName, start);
debugParsed(start);
}
protected void parseAlterDatabase(Marker start) {
tokens.consumeAnyOf("DATABASE","SCHEMA");
String dbName = tokens.consume();
consumeRemainingStatement(start);
signalAlterDatabase(dbName, null, start);
debugParsed(start);
}
protected void parseDropDatabase(Marker start) {
tokens.consumeAnyOf("DATABASE","SCHEMA");
tokens.canConsume("IF","EXISTS");
String dbName = tokens.consume();
signalDropDatabase(dbName, start);
debugParsed(start);
}
protected void parseCreateTable(Marker start) {
tokens.canConsume("TEMPORARY");
@ -751,6 +776,8 @@ protected void parseAlter(Marker marker) {
if (tokens.matches("TABLE") || tokens.matches("IGNORE", "TABLE")) {
parseAlterTable(marker);
debugParsed(marker);
} else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
parseAlterDatabase(marker);
} else {
parseAlterUnknown(marker);
}
@ -928,6 +955,8 @@ protected void parseDrop(Marker marker) {
parseDropView(marker);
} else if (tokens.matches("INDEX")) {
parseDropIndex(marker);
} else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
parseDropDatabase(marker);
} else {
parseDropUnknown(marker);
}

View File

@ -40,6 +40,7 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.relational.mapping.ColumnMappers;
@ -84,6 +85,7 @@ public Struct schemaChangeRecordValue(SourceInfo source, String databaseName, St
private final DatabaseHistory dbHistory;
private final TopicSelector topicSelector;
private final MySqlDdlParser ddlParser;
private final DdlChanges ddlChanges;
private final Tables tables;
private final TableSchemaBuilder schemaBuilder = new TableSchemaBuilder();
private final Map<TableId, TableSchema> tableSchemaByTableId = new HashMap<>();
@ -114,6 +116,8 @@ public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory,
this.columnFilter = columnFilter;
this.columnMappers = columnSelectors;
this.ddlParser = new MySqlDdlParser(false); // don't include views
this.ddlChanges = new DdlChanges(this.ddlParser.terminator());
this.ddlParser.addListener(ddlChanges);
this.recordSchemaChangesInSourceRecords = recordSchemaChangesInSourceRecords;
Predicate<TableId> knownTables = (id) -> !unknownTableIds.contains(id); // known if not unknown
this.tableFilter = tableFilter != null ? tableFilter.and(knownTables) : knownTables;
@ -141,31 +145,67 @@ public void rotateLogs(Event event, SourceInfo source, Consumer<SourceRecord> re
public void updateTableCommand(Event event, SourceInfo source, Consumer<SourceRecord> recorder) {
QueryEventData command = event.getData();
// The command's database is the one that the client was using when submitting the DDL statements,
// and that might not be the database(s) affected by the DDL statements ...
String databaseName = command.getDatabase();
String ddlStatements = command.getSql();
if (ignoredQueryStatements.contains(ddlStatements)) return;
logger.debug("Received update table command: {}", event);
try {
this.ddlChanges.reset();
this.ddlParser.setCurrentSchema(databaseName);
this.ddlParser.parse(ddlStatements, tables);
} catch (ParsingException e) {
logger.error("Error parsing DDL statement and updating tables: {}", ddlStatements, e);
} finally {
// Record the DDL statement so that we can later recover them if needed ...
dbHistory.record(source.partition(), source.offset(), databaseName, tables, ddlStatements);
if (recordSchemaChangesInSourceRecords) {
if (recordSchemaChangesInSourceRecords && dbFilter.test(databaseName)) {
String serverName = source.serverName();
String topicName = topicSelector.getTopic(serverName);
Integer partition = 0;
Struct key = schemaChangeRecordKey(databaseName);
Struct value = schemaChangeRecordValue(source, databaseName, ddlStatements);
SourceRecord record = new SourceRecord(source.partition(), source.offset(),
topicName, partition,
SCHEMA_CHANGE_RECORD_KEY_SCHEMA, key,
SCHEMA_CHANGE_RECORD_VALUE_SCHEMA, value);
recorder.accept(record);
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
// by database. Unfortunately, the databaseName on the event might not be the same database as that
// being modified by the DDL statements (since the DDL statements can have fully-qualified names).
// Therefore, we have to look at each statement to figure out which database it applies and then
// record the DDL statements (still in the same order) to those databases.
if ( !ddlChanges.isEmpty() && ddlChanges.applyToMoreDatabasesThan(databaseName) ) {
// We understood at least some of the DDL statements and can figure out to which database they apply.
// They also apply to more databases than 'databaseName', so we need to apply the DDL statements in
// the same order they were read for each _affected_ database, grouped together if multiple apply
// to the same _affected_ database...
ddlChanges.groupStatementStringsByDatabase((dbName, statements) -> {
if (dbFilter.test(dbName)) {
String serverName = source.serverName();
String topicName = topicSelector.getTopic(serverName);
Integer partition = 0;
Struct key = schemaChangeRecordKey(databaseName);
Struct value = schemaChangeRecordValue(source, dbName, statements);
SourceRecord record = new SourceRecord(source.partition(), source.offset(),
topicName, partition,
SCHEMA_CHANGE_RECORD_KEY_SCHEMA, key,
SCHEMA_CHANGE_RECORD_VALUE_SCHEMA, value);
recorder.accept(record);
}
});
} else if (dbFilter.test(databaseName)) {
// Either all of the statements applied to 'databaseName', or we didn't understand any of the statements.
// But the database filter includes 'databaseName' so we should forward all of the statements ...
String serverName = source.serverName();
String topicName = topicSelector.getTopic(serverName);
Integer partition = 0;
Struct key = schemaChangeRecordKey(databaseName);
Struct value = schemaChangeRecordValue(source, databaseName, ddlStatements);
SourceRecord record = new SourceRecord(source.partition(), source.offset(),
topicName, partition,
SCHEMA_CHANGE_RECORD_KEY_SCHEMA, key,
SCHEMA_CHANGE_RECORD_VALUE_SCHEMA, value);
recorder.accept(record);
}
}
// Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the
// schema change records so that failure recovery (which is based on of the history) won't lose
// schema change records.
dbHistory.record(source.partition(), source.offset(), databaseName, tables, ddlStatements);
}
// Figure out what changed ...

View File

@ -86,9 +86,10 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
.build();
// Start the connector ...
start(MySqlConnector.class, config);
//waitForAvailableRecords(10, TimeUnit.SECONDS);
// Consume the first records due to startup and initialization of the database ...
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(6+9+9+4+5);
assertThat(records.recordsForTopic("kafka-connect").size()).isEqualTo(6);
assertThat(records.recordsForTopic("kafka-connect.connector_test.products").size()).isEqualTo(9);
@ -99,19 +100,22 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(6);
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("connector_test").forEach(this::print);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
// Make sure there are no more ...
// ---------------------------------------------------------------------------------------------------------------
// Stopping the connector does not lose events recorded when connector is not running
// ---------------------------------------------------------------------------------------------------------------
// Make sure there are no more events and then stop the connector ...
waitForAvailableRecords(3, TimeUnit.SECONDS);
int totalConsumed = consumeAvailableRecords(this::print);
assertThat(totalConsumed).isEqualTo(0);
stopConnector();
// Make some changes to data only ...
// Make some changes to data only while the connector is stopped ...
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.query("SELECT * FROM products", rs -> {
@ -130,7 +134,9 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertThat(records.recordsForTopic("kafka-connect.connector_test.products").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(1);
// Create an additional few records ...
// ---------------------------------------------------------------------------------------------------------------
// Simple INSERT
// ---------------------------------------------------------------------------------------------------------------
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56);");
@ -147,7 +153,9 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
List<SourceRecord> inserts = records.recordsForTopic("kafka-connect.connector_test.products");
assertInsert(inserts.get(0), "id", 1001);
// Update one of the records by changing its primary key ...
// ---------------------------------------------------------------------------------------------------------------
// Changing the primary key of a row should result in 3 events: INSERT, DELETE, and TOMBSTONE
// ---------------------------------------------------------------------------------------------------------------
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE products SET id=2001, description='really old robot' WHERE id=1001");
@ -164,7 +172,9 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertDelete(updates.get(1), "id", 1001);
assertTombstone(updates.get(2), "id", 1001);
// Update one of the records with no schema change ...
// ---------------------------------------------------------------------------------------------------------------
// Simple UPDATE (with no schema changes)
// ---------------------------------------------------------------------------------------------------------------
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE products SET weight=1345.67 WHERE id=2001");
@ -182,10 +192,13 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertUpdate(updates.get(0), "id", 2001);
updates.forEach(this::validate);
// ---------------------------------------------------------------------------------------------------------------
// Change our schema with a fully-qualified name; we should still see this event
// ---------------------------------------------------------------------------------------------------------------
// Add a column with default to the 'products' table and explicitly update one record ...
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("ALTER TABLE products ADD COLUMN volume FLOAT NOT NULL, ADD COLUMN alias VARCHAR(30) NOT NULL AFTER description");
connection.execute("ALTER TABLE connector_test.products ADD COLUMN volume FLOAT NOT NULL, ADD COLUMN alias VARCHAR(30) NOT NULL AFTER description");
connection.execute("UPDATE products SET volume=13.5 WHERE id=2001");
connection.query("SELECT * FROM products", rs -> {
if (Testing.Print.isEnabled()) connection.print(rs);
@ -202,11 +215,32 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertUpdate(updates.get(0), "id", 2001);
updates.forEach(this::validate);
// Testing.Print.enable();
// records.forEach(this::printJson);
// ---------------------------------------------------------------------------------------------------------------
// DBZ-55 Change our schema using a different database and a fully-qualified name; we should still see this event
// ---------------------------------------------------------------------------------------------------------------
// Connect to a different database, but use the fully qualified name for a table in our database ...
try (MySQLConnection db = MySQLConnection.forTestDatabase("emptydb");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("CREATE TABLE connector_test.stores ("
+ " id INT(11) PRIMARY KEY NOT NULL AUTO_INCREMENT,"
+ " first_name VARCHAR(255) NOT NULL,"
+ " last_name VARCHAR(255) NOT NULL,"
+ " email VARCHAR(255) NOT NULL );");
}
}
// To ensure that the server doesn't generate more events than we're expecting, do something completely different
// with a different table and then read that event: Change the products on hand for one product ...
// And consume the one schema change event only ...
records = consumeRecordsByTopic(1);
assertThat(records.topics().size()).isEqualTo(1);
assertThat(records.recordsForTopic("kafka-connect").size()).isEqualTo(1);
//Testing.Print.enable();
records.recordsForTopic("kafka-connect").forEach(this::validate);
// ---------------------------------------------------------------------------------------------------------------
// Make sure there are no additional events
// ---------------------------------------------------------------------------------------------------------------
// Do something completely different with a table we've not modified yet and then read that event.
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE products_on_hand SET quantity=20 WHERE product_id=109");
@ -216,7 +250,7 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
}
}
// And consume the one update ...
// And make sure we consume that one update ...
records = consumeRecordsByTopic(1);
assertThat(records.topics().size()).isEqualTo(1);
updates = records.recordsForTopic("kafka-connect.connector_test.products_on_hand");
@ -224,7 +258,9 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertUpdate(updates.get(0), "product_id", 109);
updates.forEach(this::validate);
// ---------------------------------------------------------------------------------------------------------------
// Stop the connector ...
// ---------------------------------------------------------------------------------------------------------------
stopConnector();
}

View File

@ -301,7 +301,7 @@ public void shouldParseTestStatements() {
parser.parse(readFile("ddl/mysql-test-statements.ddl"), tables);
Testing.print(tables);
assertThat(tables.size()).isEqualTo(6);
assertThat(listener.total()).isEqualTo(49);
assertThat(listener.total()).isEqualTo(62);
// listener.forEach(this::printEvent);
}
@ -316,7 +316,7 @@ public void shouldParseSomeLinesFromCreateStatements() {
public void shouldParseMySql56InitializationStatements() {
parser.parse(readLines(1, "ddl/mysql-test-init-5.6.ddl"), tables);
assertThat(tables.size()).isEqualTo(85); // 1 table
assertThat(listener.total()).isEqualTo(112);
assertThat(listener.total()).isEqualTo(118);
listener.forEach(this::printEvent);
}
@ -324,7 +324,7 @@ public void shouldParseMySql56InitializationStatements() {
public void shouldParseMySql57InitializationStatements() {
parser.parse(readLines(1, "ddl/mysql-test-init-5.7.ddl"), tables);
assertThat(tables.size()).isEqualTo(123);
assertThat(listener.total()).isEqualTo(125);
assertThat(listener.total()).isEqualTo(132);
listener.forEach(this::printEvent);
}

View File

@ -0,0 +1,160 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.ddl;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import io.debezium.annotation.NotThreadSafe;
/**
* A {@link DdlParserListener} that accumulates changes, allowing them to be consumed in the same order by database.
*
* @author Randall Hauch
*/
@NotThreadSafe
public class DdlChanges implements DdlParserListener {
private final String terminator;
private final List<Event> events = new ArrayList<>();
private final Set<String> databaseNames = new HashSet<>();
/**
* Create a new changes object with ';' as the terminator token.
*/
public DdlChanges() {
this(null);
}
/**
* Create a new changes object with the designated terminator token.
*
* @param terminator the token used to terminate each statement; may be null
*/
public DdlChanges(String terminator) {
this.terminator = terminator != null ? terminator : ";";
}
/**
* Clear all accumulated changes.
*
* @return this object for method chaining; never null
*/
public DdlChanges reset() {
events.clear();
databaseNames.clear();
return this;
}
@Override
public void handle(Event event) {
events.add(event);
databaseNames.add(getDatabase(event));
}
/**
* Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
* but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
* @param consumer the consumer
*/
public void groupStatementStringsByDatabase(DatabaseStatementStringConsumer consumer) {
groupEventsByDatabase((DatabaseEventConsumer)(dbName,eventList)->{
StringBuilder statements = new StringBuilder();
eventList.forEach(event->{
statements.append(event.statement());
statements.append(terminator);
});
consumer.consume(dbName, statements.toString());
});
}
/**
* Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
* but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
* @param consumer the consumer
*/
public void groupStatementsByDatabase(DatabaseStatementConsumer consumer) {
groupEventsByDatabase((DatabaseEventConsumer)(dbName,eventList)->{
List<String> statements = new ArrayList<>();
eventList.forEach(event->statements.add(event.statement()));
consumer.consume(dbName, statements);
});
}
/**
* Consume the events in the same order they were {@link #handle(io.debezium.relational.ddl.DdlParserListener.Event) recorded},
* but grouped by database name. Multiple sequential statements that were applied to the same database are grouped together.
* @param consumer the consumer
*/
public void groupEventsByDatabase(DatabaseEventConsumer consumer) {
if ( isEmpty() ) return;
if ( databaseNames.size() <= 1 ) {
consumer.consume(databaseNames.iterator().next(), events);
return;
}
List<Event> dbEvents = new ArrayList<>();
String currentDatabase = null;
for (Event event : events) {
String dbName = getDatabase(event);
if (currentDatabase == null || dbName.equals(currentDatabase)) {
currentDatabase = dbName;
// Accumulate the statement ...
dbEvents.add(event);
} else {
// Submit the statements ...
consumer.consume(currentDatabase, dbEvents);
}
}
}
protected String getDatabase(Event event) {
switch (event.type()) {
case CREATE_TABLE:
case ALTER_TABLE:
case DROP_TABLE:
TableEvent tableEvent = (TableEvent) event;
return tableEvent.tableId().catalog();
case CREATE_INDEX:
case DROP_INDEX:
TableIndexEvent tableIndexEvent = (TableIndexEvent) event;
return tableIndexEvent.tableId().catalog();
case CREATE_DATABASE:
case ALTER_DATABASE:
case DROP_DATABASE:
DatabaseEvent dbEvent = (DatabaseEvent) event;
return dbEvent.databaseName();
}
assert false : "Should never happen";
return null;
}
public boolean isEmpty() {
return events.isEmpty();
}
public boolean applyToMoreDatabasesThan( String name ) {
return databaseNames.contains(name) ? databaseNames.size() > 1 : databaseNames.size() > 0;
}
@Override
public String toString() {
return events.toString();
}
public static interface DatabaseEventConsumer {
void consume(String databaseName, List<Event> events);
}
public static interface DatabaseStatementConsumer {
void consume(String databaseName, List<String> ddlStatements);
}
public static interface DatabaseStatementStringConsumer {
void consume(String databaseName, String ddlStatements);
}
}

View File

@ -26,6 +26,8 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParserListener.DatabaseAlteredEvent;
import io.debezium.relational.ddl.DdlParserListener.DatabaseCreatedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableAlteredEvent;
import io.debezium.relational.ddl.DdlParserListener.TableCreatedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableDroppedEvent;
@ -128,7 +130,11 @@ protected void initializeStatementStarts(TokenSet statementStartTokens) {
statementStartTokens.add("CREATE", "ALTER", "DROP", "INSERT", "SET", "GRANT", "REVOKE");
}
protected final String terminator() {
/**
* The token used to terminate a DDL statement.
* @return the terminating token; never null
*/
public final String terminator() {
return terminator;
}
@ -370,6 +376,37 @@ protected void signalEvent(DdlParserListener.Event event) {
}
}
/**
* Signal a create database event to all listeners.
*
* @param databaseName the database name; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalCreateDatabase(String databaseName, Marker statementStart) {
signalEvent(new DatabaseCreatedEvent(databaseName, statement(statementStart)));
}
/**
* Signal an alter database event to all listeners.
*
* @param databaseName the database name; may not be null
* @param previousDatabaseName the previous name of the database if it was renamed, or null if it was not renamed
* @param statementStart the start of the statement; may not be null
*/
protected void signalAlterDatabase(String databaseName, String previousDatabaseName, Marker statementStart) {
signalEvent(new DatabaseAlteredEvent(databaseName, previousDatabaseName, statement(statementStart)));
}
/**
* Signal a drop database event to all listeners.
*
* @param databaseName the database name; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalDropDatabase(String databaseName, Marker statementStart) {
signalEvent(new DatabaseCreatedEvent(databaseName, statement(statementStart)));
}
/**
* Signal a create table event to all listeners.
*

View File

@ -5,6 +5,7 @@
*/
package io.debezium.relational.ddl;
import io.debezium.annotation.Immutable;
import io.debezium.relational.TableId;
/**
@ -32,11 +33,13 @@ public interface DdlParserListener {
public static enum EventType {
CREATE_TABLE, ALTER_TABLE, DROP_TABLE,
CREATE_INDEX, DROP_INDEX,
CREATE_DATABASE, ALTER_DATABASE, DROP_DATABASE,
}
/**
* The base class for all concrete events.
*/
@Immutable
public static abstract class Event {
private final String statement;
private final EventType type;
@ -66,6 +69,7 @@ public String statement() {
/**
* The base class for all table-related events.
*/
@Immutable
public static abstract class TableEvent extends Event {
private final TableId tableId;
private final boolean isView;
@ -101,6 +105,7 @@ public String toString() {
/**
* An event describing the creation (or replacement) of a table.
*/
@Immutable
public static class TableCreatedEvent extends TableEvent {
public TableCreatedEvent(TableId tableId, String ddlStatement, boolean isView) {
super(EventType.CREATE_TABLE, tableId, ddlStatement, isView);
@ -110,6 +115,7 @@ public TableCreatedEvent(TableId tableId, String ddlStatement, boolean isView) {
/**
* An event describing the altering of a table.
*/
@Immutable
public static class TableAlteredEvent extends TableEvent {
private final TableId previousTableId;
@ -138,6 +144,7 @@ public String toString() {
/**
* An event describing the dropping of a table.
*/
@Immutable
public static class TableDroppedEvent extends TableEvent {
public TableDroppedEvent(TableId tableId, String ddlStatement, boolean isView) {
super(EventType.DROP_TABLE, tableId, ddlStatement, isView);
@ -147,6 +154,7 @@ public TableDroppedEvent(TableId tableId, String ddlStatement, boolean isView) {
/**
* The abstract base class for all index-related events.
*/
@Immutable
public static abstract class TableIndexEvent extends Event {
private final TableId tableId;
private final String indexName;
@ -185,6 +193,7 @@ public String toString() {
/**
* An event describing the creation of an index on a table.
*/
@Immutable
public static class TableIndexCreatedEvent extends TableIndexEvent {
public TableIndexCreatedEvent(String indexName, TableId tableId, String ddlStatement) {
super(EventType.CREATE_INDEX, indexName, tableId, ddlStatement);
@ -194,9 +203,84 @@ public TableIndexCreatedEvent(String indexName, TableId tableId, String ddlState
/**
* An event describing the dropping of an index on a table.
*/
@Immutable
public static class TableIndexDroppedEvent extends TableIndexEvent {
public TableIndexDroppedEvent(String indexName, TableId tableId, String ddlStatement) {
super(EventType.DROP_INDEX, indexName, tableId, ddlStatement);
}
}
/**
* The base class for all table-related events.
*/
@Immutable
public static abstract class DatabaseEvent extends Event {
private final String databaseName;
public DatabaseEvent(EventType type, String databaseName, String ddlStatement) {
super(type, ddlStatement);
this.databaseName = databaseName;
}
/**
* Get the database name affected by this event.
* @return the database name; never null
*/
public String databaseName() {
return databaseName;
}
@Override
public String toString() {
return databaseName() + " => " + statement();
}
}
/**
* An event describing the creation of a database.
*/
@Immutable
public static class DatabaseCreatedEvent extends DatabaseEvent {
public DatabaseCreatedEvent(String databaseName, String ddlStatement) {
super(EventType.CREATE_DATABASE, databaseName, ddlStatement);
}
}
/**
* An event describing the altering of a database.
*/
@Immutable
public static class DatabaseAlteredEvent extends DatabaseEvent {
private final String previousDatabaseName;
public DatabaseAlteredEvent(String databaseName, String previousDatabaseName, String ddlStatement) {
super(EventType.ALTER_DATABASE, databaseName, ddlStatement);
this.previousDatabaseName = previousDatabaseName;
}
/**
* If the table was renamed, then get the old identifier of the table before it was renamed.
* @return the table's previous identifier; may be null if the alter did not affect the table's identifier
*/
public String previousDatabaseName() {
return previousDatabaseName;
}
@Override
public String toString() {
if ( previousDatabaseName != null ) {
return databaseName() + " (was " + previousDatabaseName() + ") => " + statement();
}
return databaseName() + " => " + statement();
}
}
/**
* An event describing the dropping of a database.
*/
@Immutable
public static class DatabaseDroppedEvent extends DatabaseEvent {
public DatabaseDroppedEvent(String databaseName, String ddlStatement) {
super(EventType.DROP_DATABASE, databaseName, ddlStatement);
}
}
}

View File

@ -131,11 +131,38 @@ protected void parseCreate(Marker marker) {
} else if (tokens.matches("VIEW") || tokens.matches("RECURSIVE", "VIEW")) {
parseCreateView(marker);
debugParsed(marker);
} else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
parseCreateDatabase(marker);
} else {
parseCreateUnknown(marker);
}
}
protected void parseCreateDatabase(Marker start) {
tokens.consumeAnyOf("DATABASE","SCHEMA");
tokens.canConsume("IF","NOT","EXISTS");
String dbName = tokens.consume();
consumeRemainingStatement(start);
signalCreateDatabase(dbName, start);
debugParsed(start);
}
protected void parseAlterDatabase(Marker start) {
tokens.consumeAnyOf("DATABASE","SCHEMA");
String dbName = tokens.consume();
consumeRemainingStatement(start);
signalAlterDatabase(dbName, null, start);
debugParsed(start);
}
protected void parseDropDatabase(Marker start) {
tokens.consumeAnyOf("DATABASE","SCHEMA");
tokens.canConsume("IF","EXISTS");
String dbName = tokens.consume();
signalDropDatabase(dbName, start);
debugParsed(start);
}
protected void parseCreateTable(Marker start) {
tokens.canConsumeAnyOf("GLOBAL", "LOCAL", "TEMPORARY");
tokens.consume("TABLE");
@ -555,6 +582,8 @@ protected void parseAlter(Marker marker) {
if (tokens.matches("TABLE") || tokens.matches("IGNORE", "TABLE")) {
parseAlterTable(marker);
debugParsed(marker);
} else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
parseAlterDatabase(marker);
} else {
parseAlterUnknown(marker);
}
@ -659,6 +688,8 @@ protected void parseDrop(Marker marker) {
} else if (tokens.matches("VIEW")) {
parseDropView(marker);
debugParsed(marker);
} else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) {
parseDropDatabase(marker);
} else {
parseDropUnknown(marker);
}

View File

@ -40,7 +40,7 @@ public interface DatabaseHistory {
* @param position the point in history where these DDL changes were made, which may be used when
* {@link #recover(Map, Map, Tables, DdlParser) recovering} the schema to some point in history; may not be
* null
* @param databaseName the name of the database whose schema is being changed; may not be null
* @param databaseName the name of the database whose schema is being changed; may be null
* @param schema the current definition of the database schema; may not be null
* @param ddl the DDL statements that describe the changes to the database schema; may not be null
*/

View File

@ -0,0 +1,98 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.ddl;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParserListener.EventType;
public class DdlChangesTest {
private DdlChanges changes;
private DdlParser parser;
private Tables tables;
@Before
public void beforeEach() {
changes = new DdlChanges();
parser = new DdlParserSql2003();
parser.addListener(changes);
tables = new Tables();
}
@Test
public void shouldParseMultipleStatementsWithDefaultDatabase() {
parser.setCurrentSchema("mydb");
String ddl = "CREATE TABLE foo ( " + System.lineSeparator()
+ " c1 INTEGER NOT NULL, " + System.lineSeparator()
+ " c2 VARCHAR(22) " + System.lineSeparator()
+ "); " + System.lineSeparator()
+ "-- This is a comment" + System.lineSeparator()
+ "DROP TABLE foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
changes.groupEventsByDatabase((dbName, list) -> {
assertThat(dbName).isEqualTo("mydb");
assertThat(list.size()).isEqualTo(2);
assertThat(list.get(0).type()).isEqualTo(EventType.CREATE_TABLE);
assertThat(list.get(1).type()).isEqualTo(EventType.DROP_TABLE);
});
}
@Test
public void shouldParseMultipleStatementsWithFullyQualifiedDatabase() {
parser.setCurrentSchema("mydb");
String ddl = "CREATE TABLE other.foo ( " + System.lineSeparator()
+ " c1 INTEGER NOT NULL, " + System.lineSeparator()
+ " c2 VARCHAR(22) " + System.lineSeparator()
+ "); " + System.lineSeparator()
+ "-- This is a comment" + System.lineSeparator()
+ "DROP TABLE other.foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
changes.groupEventsByDatabase((dbName, list) -> {
assertThat(dbName).isEqualTo("other");
assertThat(list.size()).isEqualTo(2);
assertThat(list.get(0).type()).isEqualTo(EventType.CREATE_TABLE);
assertThat(list.get(1).type()).isEqualTo(EventType.DROP_TABLE);
});
}
@Test
public void shouldParseMultipleStatementsWithNoCurrentSchemaAndFullyQualifiedDatabase() {
String ddl = "CREATE TABLE other.foo ( " + System.lineSeparator()
+ " c1 INTEGER NOT NULL, " + System.lineSeparator()
+ " c2 VARCHAR(22) " + System.lineSeparator()
+ "); " + System.lineSeparator()
+ "-- This is a comment" + System.lineSeparator()
+ "DROP TABLE other.foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
for (int i = 0; i != 5; ++i) {
changes.groupEventsByDatabase((dbName, list) -> {
assertThat(dbName).isEqualTo("other");
assertThat(list.size()).isEqualTo(2);
assertThat(list.get(0).type()).isEqualTo(EventType.CREATE_TABLE);
assertThat(list.get(1).type()).isEqualTo(EventType.DROP_TABLE);
});
}
changes.reset();
changes.groupEventsByDatabase((dbName, list) -> {
fail("Should not have any changes");
});
}
}

View File

@ -273,6 +273,10 @@ protected int consumeRecords(int numberOfRecords, Consumer<SourceRecord> recordC
if (recordConsumer != null) {
recordConsumer.accept(record);
}
if ( Testing.Debug.isEnabled() ) {
Testing.debug("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords-recordsConsumed) + " more)");
debug(record);
}
}
}
return recordsConsumed;
@ -501,6 +505,17 @@ protected void validate(SourceRecord record) {
SchemaAndValue keyWithSchema = null;
SchemaAndValue valueWithSchema = null;
try {
// The key should never be null ...
assertThat(record.key()).isNotNull();
assertThat(record.keySchema()).isNotNull();
// If the value is not null there must be a schema; otherwise, the schema should also be null ...
if ( record.value() == null ) {
assertThat(record.valueSchema()).isNull();
} else {
assertThat(record.valueSchema()).isNotNull();
}
// First serialize and deserialize the key ...
byte[] keyBytes = keyJsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
keyJson = keyJsonDeserializer.deserialize(record.topic(), keyBytes);
@ -538,7 +553,7 @@ protected void validate(SourceRecord record) {
}
}
protected void print(SourceRecord record) {
protected String printToString(SourceRecord record) {
StringBuilder sb = new StringBuilder("SourceRecord{");
sb.append("sourcePartition=").append(record.sourcePartition());
sb.append(", sourceOffset=").append(record.sourceOffset());
@ -549,7 +564,15 @@ protected void print(SourceRecord record) {
sb.append(", value=");
append(record.value(), sb);
sb.append("}");
Testing.print(sb.toString());
return sb.toString();
}
protected void print(SourceRecord record) {
Testing.print(printToString(record));
}
protected void debug(SourceRecord record) {
Testing.debug(printToString(record));
}
protected void printJson(SourceRecord record) {