DBZ-1016 Fix expected records count
This commit is contained in:
parent
68766e9355
commit
415830003a
@ -52,7 +52,9 @@ public class MySqlConnectorIT extends AbstractConnectorTest {
|
|||||||
.withDbHistoryPath(DB_HISTORY_PATH);
|
.withDbHistoryPath(DB_HISTORY_PATH);
|
||||||
|
|
||||||
// Defines how many initial events are generated from loading the test databases.
|
// Defines how many initial events are generated from loading the test databases.
|
||||||
private static final int INITIAL_EVENT_COUNT = 9 + 9 + 4 + 5 + 6;
|
private static final int PRODUCTS_TABLE_EVENT_COUNT = 9;
|
||||||
|
private static final int ORDERS_TABLE_EVENT_COUNT = 5;
|
||||||
|
private static final int INITIAL_EVENT_COUNT = PRODUCTS_TABLE_EVENT_COUNT + 9 + 4 + ORDERS_TABLE_EVENT_COUNT + 6;
|
||||||
|
|
||||||
private Configuration config;
|
private Configuration config;
|
||||||
|
|
||||||
@ -435,7 +437,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
|||||||
// ---------------------------------------------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
// Consume all of the events due to startup and initialization of the database
|
// Consume all of the events due to startup and initialization of the database
|
||||||
// ---------------------------------------------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11 + 1 + 2); // 11 schema change records + 1 SET statement
|
SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11 + 1); // 11 schema change records + 1 SET statement
|
||||||
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(12);
|
assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(12);
|
||||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("products")).size()).isEqualTo(9);
|
assertThat(records.recordsForTopic(DATABASE.topicForTable("products")).size()).isEqualTo(9);
|
||||||
assertThat(records.recordsForTopic(DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
|
assertThat(records.recordsForTopic(DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
|
||||||
@ -777,8 +779,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLException, InterruptedException {
|
public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLException, InterruptedException {
|
||||||
String masterPort = System.getProperty("database.port");
|
String masterPort = System.getProperty("database.port", "3306");
|
||||||
String replicaPort = System.getProperty("database.replica.port");
|
String replicaPort = System.getProperty("database.replica.port", "3306");
|
||||||
boolean replicaIsMaster = masterPort.equals(replicaPort);
|
boolean replicaIsMaster = masterPort.equals(replicaPort);
|
||||||
if (!replicaIsMaster) {
|
if (!replicaIsMaster) {
|
||||||
// Give time for the replica to catch up to the master ...
|
// Give time for the replica to catch up to the master ...
|
||||||
@ -786,8 +788,8 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc
|
|||||||
}
|
}
|
||||||
|
|
||||||
config = Configuration.create()
|
config = Configuration.create()
|
||||||
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname"))
|
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))
|
||||||
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port"))
|
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))
|
||||||
.with(MySqlConnectorConfig.USER, "snapper")
|
.with(MySqlConnectorConfig.USER, "snapper")
|
||||||
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
|
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
|
||||||
.with(MySqlConnectorConfig.SERVER_ID, 28765)
|
.with(MySqlConnectorConfig.SERVER_ID, 28765)
|
||||||
@ -825,8 +827,8 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() throws SQLException, InterruptedException {
|
public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() throws SQLException, InterruptedException {
|
||||||
String masterPort = System.getProperty("database.port");
|
String masterPort = System.getProperty("database.port", "3306");
|
||||||
String replicaPort = System.getProperty("database.replica.port");
|
String replicaPort = System.getProperty("database.replica.port", "3306");
|
||||||
boolean replicaIsMaster = masterPort.equals(replicaPort);
|
boolean replicaIsMaster = masterPort.equals(replicaPort);
|
||||||
if (!replicaIsMaster) {
|
if (!replicaIsMaster) {
|
||||||
// Give time for the replica to catch up to the master ...
|
// Give time for the replica to catch up to the master ...
|
||||||
@ -835,8 +837,8 @@ public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() thro
|
|||||||
|
|
||||||
String tables=String.format("%s.products,%s.products_on_hand", DATABASE.getDatabaseName(), DATABASE.getDatabaseName());
|
String tables=String.format("%s.products,%s.products_on_hand", DATABASE.getDatabaseName(), DATABASE.getDatabaseName());
|
||||||
config = Configuration.create()
|
config = Configuration.create()
|
||||||
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname"))
|
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))
|
||||||
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port"))
|
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))
|
||||||
.with(MySqlConnectorConfig.USER, "snapper")
|
.with(MySqlConnectorConfig.USER, "snapper")
|
||||||
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
|
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
|
||||||
.with(MySqlConnectorConfig.SERVER_ID, 28765)
|
.with(MySqlConnectorConfig.SERVER_ID, 28765)
|
||||||
@ -1126,7 +1128,7 @@ public void shouldNotParseQueryIfServerOptionDisabled() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
||||||
@ -1177,7 +1179,7 @@ public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
||||||
@ -1229,7 +1231,7 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
||||||
@ -1281,7 +1283,7 @@ public void parseMultipleInsertStatements() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String insertSqlStatement1 = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
final String insertSqlStatement1 = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
|
||||||
@ -1344,7 +1346,7 @@ public void parseMultipleRowInsertStatement() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)";
|
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)";
|
||||||
@ -1405,7 +1407,7 @@ public void parseDeleteQuery() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String deleteSqlStatement = "DELETE FROM orders WHERE order_number=10001 LIMIT 1";
|
final String deleteSqlStatement = "DELETE FROM orders WHERE order_number=10001 LIMIT 1";
|
||||||
@ -1456,7 +1458,7 @@ public void parseMultiRowDeleteQuery() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String deleteSqlStatement = "DELETE FROM orders WHERE purchaser=1002";
|
final String deleteSqlStatement = "DELETE FROM orders WHERE purchaser=1002";
|
||||||
@ -1515,7 +1517,7 @@ public void parseUpdateQuery() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(PRODUCTS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String updateSqlStatement = "UPDATE products set name='toaster' where id=109 LIMIT 1";
|
final String updateSqlStatement = "UPDATE products set name='toaster' where id=109 LIMIT 1";
|
||||||
@ -1566,7 +1568,7 @@ public void parseMultiRowUpdateQuery() throws Exception {
|
|||||||
start(MySqlConnector.class, config);
|
start(MySqlConnector.class, config);
|
||||||
|
|
||||||
// Flush all existing records not related to the test.
|
// Flush all existing records not related to the test.
|
||||||
consumeRecords(INITIAL_EVENT_COUNT, null);
|
consumeRecords(ORDERS_TABLE_EVENT_COUNT, null);
|
||||||
|
|
||||||
// Define insert query we want to validate.
|
// Define insert query we want to validate.
|
||||||
final String updateSqlStatement = "UPDATE orders set quantity=0 where order_number in (10001, 10004)";
|
final String updateSqlStatement = "UPDATE orders set quantity=0 where order_number in (10001, 10004)";
|
||||||
|
@ -549,14 +549,13 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
|||||||
// Consume all of the events due to startup and initialization of the database
|
// Consume all of the events due to startup and initialization of the database
|
||||||
// ---------------------------------------------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
// Testing.Debug.enable();
|
// Testing.Debug.enable();
|
||||||
int numCreateDatabase = 1;
|
|
||||||
int numTables = 11;
|
int numTables = 11;
|
||||||
int numDataRecords = 20;
|
int numDataRecords = 20;
|
||||||
int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
|
int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
|
||||||
int numCreateDefiner = 1;
|
int numCreateDefiner = 1;
|
||||||
int numSetVariables = 1;
|
int numSetVariables = 1;
|
||||||
SourceRecords records =
|
SourceRecords records =
|
||||||
consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords + numCreateDefiner + numCreateDatabase);
|
consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords);
|
||||||
stopConnector();
|
stopConnector();
|
||||||
assertThat(records).isNotNull();
|
assertThat(records).isNotNull();
|
||||||
assertThat(records.recordsForTopic(DATABASE.getServerName()).size())
|
assertThat(records.recordsForTopic(DATABASE.getServerName()).size())
|
||||||
|
Loading…
Reference in New Issue
Block a user