diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java index 4e2d48295..92a3e86aa 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java @@ -439,6 +439,13 @@ protected void parseCreateDefinition(Marker start, TableEditor table) { // Obtain the column editor ... String columnName = tokens.consume(); parseCreateColumn(start, table, columnName); + + // ALTER TABLE allows reordering the columns after the definition ... + if (tokens.canConsume("FIRST")) { + table.reorderColumn(columnName, null); + } else if (tokens.canConsume("AFTER")) { + table.reorderColumn(columnName, tokens.consume()); + } } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 206805b75..e1030aeff 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -89,7 +89,6 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte //waitForAvailableRecords(10, TimeUnit.SECONDS); // Consume the first records due to startup and initialization of the database ... - //Testing.Print.enable(); SourceRecords records = consumeRecordsByTopic(6+9+9+4+5); assertThat(records.recordsForTopic("kafka-connect").size()).isEqualTo(6); assertThat(records.recordsForTopic("kafka-connect.connector_test.products").size()).isEqualTo(9); @@ -107,7 +106,6 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte records.forEach(this::validate); // Make sure there are no more ... - Testing.Print.disable(); waitForAvailableRecords(3, TimeUnit.SECONDS); int totalConsumed = consumeAvailableRecords(this::print); assertThat(totalConsumed).isEqualTo(0); @@ -165,9 +163,66 @@ public void shouldConsumeAllEventsFromDatabase() throws SQLException, Interrupte assertInsert(updates.get(0), "id", 2001); assertDelete(updates.get(1), "id", 1001); assertTombstone(updates.get(2), "id", 1001); - - //Testing.Print.enable(); - //updates.forEach(this::printJson); + + // Update one of the records with no schema change ... + try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) { + try (JdbcConnection connection = db.connect()) { + connection.execute("UPDATE products SET weight=1345.67 WHERE id=2001"); + connection.query("SELECT * FROM products", rs -> { + if (Testing.Print.isEnabled()) connection.print(rs); + }); + } + } + + // And consume the one update ... + records = consumeRecordsByTopic(1); + assertThat(records.topics().size()).isEqualTo(1); + updates = records.recordsForTopic("kafka-connect.connector_test.products"); + assertThat(updates.size()).isEqualTo(1); + assertUpdate(updates.get(0), "id", 2001); + updates.forEach(this::validate); + + // Add a column with default to the 'products' table and explicitly update one record ... + try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) { + try (JdbcConnection connection = db.connect()) { + connection.execute("ALTER TABLE products ADD COLUMN volume FLOAT NOT NULL, ADD COLUMN alias VARCHAR(30) NOT NULL AFTER description"); + connection.execute("UPDATE products SET volume=13.5 WHERE id=2001"); + connection.query("SELECT * FROM products", rs -> { + if (Testing.Print.isEnabled()) connection.print(rs); + }); + } + } + + // And consume the one schema change event and one update event ... + records = consumeRecordsByTopic(2); + assertThat(records.topics().size()).isEqualTo(2); + assertThat(records.recordsForTopic("kafka-connect").size()).isEqualTo(1); + updates = records.recordsForTopic("kafka-connect.connector_test.products"); + assertThat(updates.size()).isEqualTo(1); + assertUpdate(updates.get(0), "id", 2001); + updates.forEach(this::validate); + + // Testing.Print.enable(); + // records.forEach(this::printJson); + + // To ensure that the server doesn't generate more events than we're expecting, do something completely different + // with a different table and then read that event: Change the products on hand for one product ... + try (MySQLConnection db = MySQLConnection.forTestDatabase("connector_test");) { + try (JdbcConnection connection = db.connect()) { + connection.execute("UPDATE products_on_hand SET quantity=20 WHERE product_id=109"); + connection.query("SELECT * FROM products_on_hand", rs -> { + if (Testing.Print.isEnabled()) connection.print(rs); + }); + } + } + + // And consume the one update ... + records = consumeRecordsByTopic(1); + assertThat(records.topics().size()).isEqualTo(1); + updates = records.recordsForTopic("kafka-connect.connector_test.products_on_hand"); + assertThat(updates.size()).isEqualTo(1); + assertUpdate(updates.get(0), "product_id", 109); + updates.forEach(this::validate); // Stop the connector ... stopConnector(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java index 0157189d6..a28d52cf3 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java @@ -243,6 +243,43 @@ public void shouldParseAlterTableStatementThatAddsCharacterSetForColumns() { assertColumn(t3, "col1", "VARCHAR CHARSET utf8", Types.VARCHAR, 75, -1, true, false, false); } + @Test + public void shouldParseAlterTableStatementAddColumns() { + String ddl = "CREATE TABLE t ( col1 VARCHAR(25) ); "; + parser.parse(ddl, tables); + assertThat(tables.size()).isEqualTo(1); + Table t = tables.forTable(new TableId(null, null, "t")); + assertThat(t).isNotNull(); + assertThat(t.columnNames()).containsExactly("col1"); + assertThat(t.primaryKeyColumnNames()).isEmpty(); + assertColumn(t, "col1", "VARCHAR", Types.VARCHAR, 25, -1, true, false, false); + assertThat(t.columnWithName("col1").position()).isEqualTo(1); + + ddl = "ALTER TABLE t ADD col2 VARCHAR(50) NOT NULL;"; + parser.parse(ddl, tables); + Table t2 = tables.forTable(new TableId(null, null, "t")); + assertThat(t2).isNotNull(); + assertThat(t2.columnNames()).containsExactly("col1","col2"); + assertThat(t2.primaryKeyColumnNames()).isEmpty(); + assertColumn(t2, "col1", "VARCHAR", Types.VARCHAR, 25, -1, true, false, false); + assertColumn(t2, "col2", "VARCHAR", Types.VARCHAR, 50, -1, false, false, false); + assertThat(t2.columnWithName("col1").position()).isEqualTo(1); + assertThat(t2.columnWithName("col2").position()).isEqualTo(2); + + ddl = "ALTER TABLE t ADD col3 FLOAT NOT NULL AFTER col1;"; + parser.parse(ddl, tables); + Table t3 = tables.forTable(new TableId(null, null, "t")); + assertThat(t3).isNotNull(); + assertThat(t3.columnNames()).containsExactly("col1","col3", "col2"); + assertThat(t3.primaryKeyColumnNames()).isEmpty(); + assertColumn(t3, "col1", "VARCHAR", Types.VARCHAR, 25, -1, true, false, false); + assertColumn(t3, "col3", "FLOAT", Types.FLOAT, -1, -1, false, false, false); + assertColumn(t3, "col2", "VARCHAR", Types.VARCHAR, 50, -1, false, false, false); + assertThat(t3.columnWithName("col1").position()).isEqualTo(1); + assertThat(t3.columnWithName("col3").position()).isEqualTo(2); + assertThat(t3.columnWithName("col2").position()).isEqualTo(3); + } + @Test public void shouldParseGrantStatement() { String ddl = "GRANT ALL PRIVILEGES ON `mysql`.* TO 'mysqluser'@'%'";