Merge pull request #48 from rhauch/dbz-45

DBZ-45 Confirmed and tested support for 'before' and 'after' states in UPDATE events
This commit is contained in:
Randall Hauch 2016-05-20 12:19:35 -05:00
commit 4840650c41
3 changed files with 104 additions and 5 deletions

View File

@ -439,6 +439,13 @@ protected void parseCreateDefinition(Marker start, TableEditor table) {
// Obtain the column editor ...
String columnName = tokens.consume();
parseCreateColumn(start, table, columnName);
// ALTER TABLE allows reordering the columns after the definition ...
if (tokens.canConsume("FIRST")) {
table.reorderColumn(columnName, null);
} else if (tokens.canConsume("AFTER")) {
table.reorderColumn(columnName, tokens.consume());
}
}
}

View File

@ -89,7 +89,6 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
//waitForAvailableRecords(10, TimeUnit.SECONDS);
// Consume the first records due to startup and initialization of the database ...
//Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(6+9+9+4+5);
assertThat(records.recordsForTopic("kafka-connect").size()).isEqualTo(6);
assertThat(records.recordsForTopic("kafka-connect.connector_test.products").size()).isEqualTo(9);
@ -107,7 +106,6 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
records.forEach(this::validate);
// Make sure there are no more ...
Testing.Print.disable();
waitForAvailableRecords(3, TimeUnit.SECONDS);
int totalConsumed = consumeAvailableRecords(this::print);
assertThat(totalConsumed).isEqualTo(0);
@ -165,9 +163,66 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte
assertInsert(updates.get(0), "id", 2001);
assertDelete(updates.get(1), "id", 1001);
assertTombstone(updates.get(2), "id", 1001);
//Testing.Print.enable();
//updates.forEach(this::printJson);
// Update one of the records with no schema change ...
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE products SET weight=1345.67 WHERE id=2001");
connection.query("SELECT * FROM products", rs -> {
if (Testing.Print.isEnabled()) connection.print(rs);
});
}
}
// And consume the one update ...
records = consumeRecordsByTopic(1);
assertThat(records.topics().size()).isEqualTo(1);
updates = records.recordsForTopic("kafka-connect.connector_test.products");
assertThat(updates.size()).isEqualTo(1);
assertUpdate(updates.get(0), "id", 2001);
updates.forEach(this::validate);
// Add a column with default to the 'products' table and explicitly update one record ...
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("ALTER TABLE products ADD COLUMN volume FLOAT NOT NULL, ADD COLUMN alias VARCHAR(30) NOT NULL AFTER description");
connection.execute("UPDATE products SET volume=13.5 WHERE id=2001");
connection.query("SELECT * FROM products", rs -> {
if (Testing.Print.isEnabled()) connection.print(rs);
});
}
}
// And consume the one schema change event and one update event ...
records = consumeRecordsByTopic(2);
assertThat(records.topics().size()).isEqualTo(2);
assertThat(records.recordsForTopic("kafka-connect").size()).isEqualTo(1);
updates = records.recordsForTopic("kafka-connect.connector_test.products");
assertThat(updates.size()).isEqualTo(1);
assertUpdate(updates.get(0), "id", 2001);
updates.forEach(this::validate);
// Testing.Print.enable();
// records.forEach(this::printJson);
// To ensure that the server doesn't generate more events than we're expecting, do something completely different
// with a different table and then read that event: Change the products on hand for one product ...
try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) {
try (JdbcConnection connection = db.connect()) {
connection.execute("UPDATE products_on_hand SET quantity=20 WHERE product_id=109");
connection.query("SELECT * FROM products_on_hand", rs -> {
if (Testing.Print.isEnabled()) connection.print(rs);
});
}
}
// And consume the one update ...
records = consumeRecordsByTopic(1);
assertThat(records.topics().size()).isEqualTo(1);
updates = records.recordsForTopic("kafka-connect.connector_test.products_on_hand");
assertThat(updates.size()).isEqualTo(1);
assertUpdate(updates.get(0), "product_id", 109);
updates.forEach(this::validate);
// Stop the connector ...
stopConnector();

View File

@ -243,6 +243,43 @@ public void shouldParseAlterTableStatementThatAddsCharacterSetForColumns() {
assertColumn(t3, "col1", "VARCHAR CHARSET utf8", Types.VARCHAR, 75, -1, true, false, false);
}
@Test
public void shouldParseAlterTableStatementAddColumns() {
String ddl = "CREATE TABLE t ( col1 VARCHAR(25) ); ";
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(1);
Table t = tables.forTable(new TableId(null, null, "t"));
assertThat(t).isNotNull();
assertThat(t.columnNames()).containsExactly("col1");
assertThat(t.primaryKeyColumnNames()).isEmpty();
assertColumn(t, "col1", "VARCHAR", Types.VARCHAR, 25, -1, true, false, false);
assertThat(t.columnWithName("col1").position()).isEqualTo(1);
ddl = "ALTER TABLE t ADD col2 VARCHAR(50) NOT NULL;";
parser.parse(ddl, tables);
Table t2 = tables.forTable(new TableId(null, null, "t"));
assertThat(t2).isNotNull();
assertThat(t2.columnNames()).containsExactly("col1","col2");
assertThat(t2.primaryKeyColumnNames()).isEmpty();
assertColumn(t2, "col1", "VARCHAR", Types.VARCHAR, 25, -1, true, false, false);
assertColumn(t2, "col2", "VARCHAR", Types.VARCHAR, 50, -1, false, false, false);
assertThat(t2.columnWithName("col1").position()).isEqualTo(1);
assertThat(t2.columnWithName("col2").position()).isEqualTo(2);
ddl = "ALTER TABLE t ADD col3 FLOAT NOT NULL AFTER col1;";
parser.parse(ddl, tables);
Table t3 = tables.forTable(new TableId(null, null, "t"));
assertThat(t3).isNotNull();
assertThat(t3.columnNames()).containsExactly("col1","col3", "col2");
assertThat(t3.primaryKeyColumnNames()).isEmpty();
assertColumn(t3, "col1", "VARCHAR", Types.VARCHAR, 25, -1, true, false, false);
assertColumn(t3, "col3", "FLOAT", Types.FLOAT, -1, -1, false, false, false);
assertColumn(t3, "col2", "VARCHAR", Types.VARCHAR, 50, -1, false, false, false);
assertThat(t3.columnWithName("col1").position()).isEqualTo(1);
assertThat(t3.columnWithName("col3").position()).isEqualTo(2);
assertThat(t3.columnWithName("col2").position()).isEqualTo(3);
}
@Test
public void shouldParseGrantStatement() {
String ddl = "GRANT ALL PRIVILEGES ON `mysql`.* TO 'mysqluser'@'%'";