diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 20afa4280..b16000db8 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -52,7 +52,9 @@ public class MySqlConnectorIT extends AbstractConnectorTest { .withDbHistoryPath(DB_HISTORY_PATH); // Defines how many initial events are generated from loading the test databases. - private static final int INITIAL_EVENT_COUNT = 9 + 9 + 4 + 5 + 6; + private static final int PRODUCTS_TABLE_EVENT_COUNT = 9; + private static final int ORDERS_TABLE_EVENT_COUNT = 5; + private static final int INITIAL_EVENT_COUNT = PRODUCTS_TABLE_EVENT_COUNT + 9 + 4 + ORDERS_TABLE_EVENT_COUNT + 6; private Configuration config; @@ -435,7 +437,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio // --------------------------------------------------------------------------------------------------------------- // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- - SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11 + 1 + 2); // 11 schema change records + 1 SET statement + SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11 + 1); // 11 schema change records + 1 SET statement assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(12); assertThat(records.recordsForTopic(DATABASE.topicForTable("products")).size()).isEqualTo(9); assertThat(records.recordsForTopic(DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9); @@ -777,8 +779,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio @Test public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLException, InterruptedException { - String masterPort = System.getProperty("database.port"); - String replicaPort = System.getProperty("database.replica.port"); + String masterPort = System.getProperty("database.port", "3306"); + String replicaPort = System.getProperty("database.replica.port", "3306"); boolean replicaIsMaster = masterPort.equals(replicaPort); if (!replicaIsMaster) { // Give time for the replica to catch up to the master ... @@ -786,8 +788,8 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc } config = Configuration.create() - .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname")) - .with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port")) + .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")) + .with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")) .with(MySqlConnectorConfig.USER, "snapper") .with(MySqlConnectorConfig.PASSWORD, "snapperpass") .with(MySqlConnectorConfig.SERVER_ID, 28765) @@ -825,8 +827,8 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc @Test public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() throws SQLException, InterruptedException { - String masterPort = System.getProperty("database.port"); - String replicaPort = System.getProperty("database.replica.port"); + String masterPort = System.getProperty("database.port", "3306"); + String replicaPort = System.getProperty("database.replica.port", "3306"); boolean replicaIsMaster = masterPort.equals(replicaPort); if (!replicaIsMaster) { // Give time for the replica to catch up to the master ... @@ -835,8 +837,8 @@ public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() thro String tables=String.format("%s.products,%s.products_on_hand", DATABASE.getDatabaseName(), DATABASE.getDatabaseName()); config = Configuration.create() - .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname")) - .with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port")) + .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost")) + .with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306")) .with(MySqlConnectorConfig.USER, "snapper") .with(MySqlConnectorConfig.PASSWORD, "snapperpass") .with(MySqlConnectorConfig.SERVER_ID, 28765) @@ -1126,7 +1128,7 @@ public void shouldNotParseQueryIfServerOptionDisabled() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"; @@ -1177,7 +1179,7 @@ public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"; @@ -1229,7 +1231,7 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"; @@ -1281,7 +1283,7 @@ public void parseMultipleInsertStatements() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String insertSqlStatement1 = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"; @@ -1344,7 +1346,7 @@ public void parseMultipleRowInsertStatement() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)"; @@ -1405,7 +1407,7 @@ public void parseDeleteQuery() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(ORDERS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String deleteSqlStatement = "DELETE FROM orders WHERE order_number=10001 LIMIT 1"; @@ -1456,7 +1458,7 @@ public void parseMultiRowDeleteQuery() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(ORDERS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String deleteSqlStatement = "DELETE FROM orders WHERE purchaser=1002"; @@ -1515,7 +1517,7 @@ public void parseUpdateQuery() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String updateSqlStatement = "UPDATE products set name='toaster' where id=109 LIMIT 1"; @@ -1566,7 +1568,7 @@ public void parseMultiRowUpdateQuery() throws Exception { start(MySqlConnector.class, config); // Flush all existing records not related to the test. - consumeRecords(INITIAL_EVENT_COUNT, null); + consumeRecords(ORDERS_TABLE_EVENT_COUNT, null); // Define insert query we want to validate. final String updateSqlStatement = "UPDATE orders set quantity=0 where order_number in (10001, 10004)"; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java index 35d1c63b6..098969623 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java @@ -549,14 +549,13 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- // Testing.Debug.enable(); - int numCreateDatabase = 1; int numTables = 11; int numDataRecords = 20; int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use) int numCreateDefiner = 1; int numSetVariables = 1; SourceRecords records = - consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords + numCreateDefiner + numCreateDatabase); + consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords); stopConnector(); assertThat(records).isNotNull(); assertThat(records.recordsForTopic(DATABASE.getServerName()).size())