DBZ-5554 Updated SqlServer configuration used in system testsuite to reflect multi task changes

This commit is contained in:
jcechace 2022-08-29 12:59:57 +03:00 committed by Jakub Cechacek
parent 2bb9d5263a
commit 8037c797e9
4 changed files with 36 additions and 22 deletions

View File

@ -85,9 +85,9 @@
<database.sqlserver.port>5432</database.sqlserver.port>
<database.sqlserver.username>sa</database.sqlserver.username>
<database.sqlserver.password>Debezium1$</database.sqlserver.password>
<database.sqlserver.dbz.username>sa</database.sqlserver.dbz.username>
<database.sqlserver.dbz.password>Debezium$1</database.sqlserver.dbz.password>
<database.sqlserver.dbname>TestDB</database.sqlserver.dbname>
<database.sqlserver.dbz.username>${database.db2.username}</database.sqlserver.dbz.username>
<database.sqlserver.dbz.password>${database.db2.password}</database.sqlserver.dbz.password>
<database.sqlserver.dbnames>TestDB</database.sqlserver.dbnames>
<!--MongoDB configuration-->
<database.mongo.port>27017</database.mongo.port>
@ -499,6 +499,15 @@
<test.database.postgresql.dbz.password>${database.postgresql.dbz.password}</test.database.postgresql.dbz.password>
<test.database.postgresql.dbname>${database.postgresql.dbname}</test.database.postgresql.dbname>
<!--SqlServer configuration-->
<test.database.sqlserver.host>${database.sqlserver.host}</test.database.sqlserver.host>
<test.database.sqlserver.port>${database.sqlserver.port}</test.database.sqlserver.port>
<test.database.sqlserver.username>${database.sqlserver.username}</test.database.sqlserver.username>
<test.database.sqlserver.password>${database.sqlserver.password}</test.database.sqlserver.password>
<test.database.sqlserver.dbz.username>${database.sqlserver.dbz.username}</test.database.sqlserver.dbz.username>
<test.database.sqlserver.dbz.password>${database.sqlserver.dbz.password}</test.database.sqlserver.dbz.password>
<test.database.sqlserver.dbnames>${database.sqlserver.dbnames}</test.database.sqlserver.dbnames>
<!--Mongo configuration-->
<test.database.mongo.host>${database.mongo.host}</test.database.mongo.host>
<test.database.mongo.port>${database.mongo.port}</test.database.mongo.port>

View File

@ -81,7 +81,7 @@ private ConfigProperties() {
public static final String DATABASE_SQLSERVER_SA_PASSWORD = System.getProperty("test.database.sqlserver.password", "Debezium1$");
public static final String DATABASE_SQLSERVER_DBZ_USERNAME = System.getProperty("test.database.sqlserver.dbz.username", DATABASE_SQLSERVER_USERNAME);
public static final String DATABASE_SQLSERVER_DBZ_PASSWORD = System.getProperty("test.database.sqlserver.dbz.password", DATABASE_SQLSERVER_SA_PASSWORD);
public static final String DATABASE_SQLSERVER_DBZ_DBNAME = System.getProperty("test.database.sqlserver.dbname", "testDB");
public static final String DATABASE_SQLSERVER_DBZ_DBNAMES = System.getProperty("test.database.sqlserver.dbnames", "testDB");
public static final Optional<String> DATABASE_SQLSERVER_HOST = stringOptionalProperty("test.database.sqlserver.host");
public static final int DATABASE_SQLSERVER_PORT = Integer.parseInt(System.getProperty("test.database.sqlserver.port", "1433"));

View File

@ -79,7 +79,8 @@ public ConnectorConfigBuilder sqlserver(SqlDatabaseController controller, String
.put("database.port", dbPort)
.put("database.user", ConfigProperties.DATABASE_SQLSERVER_DBZ_USERNAME)
.put("database.password", ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD)
.put("database.dbname", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAME)
.put("database.names", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAMES)
.put("database.encrypt", false)
.put("database.history.kafka.bootstrap.servers", kafka.getBootstrapAddress())
.put("database.history.kafka.topic", "schema-changes.inventory")
.addOperationRouterForTable("u", "customers");

View File

@ -6,7 +6,7 @@
package io.debezium.testing.system.tests.sqlserver;
import static io.debezium.testing.system.assertions.KafkaAssertions.awaitAssert;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAME;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAMES;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_SQLSERVER_DBZ_USERNAME;
import static org.assertj.core.api.Assertions.assertThat;
@ -45,13 +45,19 @@ public void insertCustomer(
throws SQLException {
SqlDatabaseClient client = dbController.getDatabaseClient(DATABASE_SQLSERVER_DBZ_USERNAME, DATABASE_SQLSERVER_DBZ_PASSWORD);
String sql = "INSERT INTO customers (first_name, last_name, email) VALUES ('" + firstName + "', '" + lastName + "', '" + email + "')";
client.execute(DATABASE_SQLSERVER_DBZ_DBNAME, sql);
client.execute(DATABASE_SQLSERVER_DBZ_DBNAMES, sql);
}
private String topic(String key) {
String prefix = connectorConfig.getDbServerName() + "." +
connectorConfig.getAsString("database.names");
return prefix + "." + key;
}
public void renameCustomer(SqlDatabaseController dbController, String oldName, String newName) throws SQLException {
SqlDatabaseClient client = dbController.getDatabaseClient(DATABASE_SQLSERVER_DBZ_USERNAME, DATABASE_SQLSERVER_DBZ_PASSWORD);
String sql = "UPDATE customers SET first_name = '" + newName + "' WHERE first_name = '" + oldName + "'";
client.execute(DATABASE_SQLSERVER_DBZ_DBNAME, sql);
client.execute(DATABASE_SQLSERVER_DBZ_DBNAMES, sql);
}
@Test
@ -70,12 +76,11 @@ public void shouldHaveRegisteredConnector() {
@Test
@Order(20)
public void shouldCreateKafkaTopics() {
String prefix = connectorConfig.getDbServerName();
assertions.assertTopicsExist(
prefix + ".dbo.customers",
prefix + ".dbo.orders",
prefix + ".dbo.products",
prefix + ".dbo.products_on_hand");
topic("dbo.customers"),
topic("dbo.orders"),
topic("dbo.products"),
topic("dbo.products_on_hand"));
}
@Test
@ -83,7 +88,7 @@ public void shouldCreateKafkaTopics() {
public void shouldSnapshotChanges() {
connectController.getMetricsReader().waitForSqlServerSnapshot(connectorConfig.getDbServerName());
String topic = connectorConfig.getDbServerName() + ".dbo.customers";
String topic = topic("dbo.customers");
awaitAssert(() -> assertions.assertRecordsCount(topic, 4));
}
@ -92,7 +97,7 @@ public void shouldSnapshotChanges() {
public void shouldStreamChanges(SqlDatabaseController dbController) throws SQLException {
insertCustomer(dbController, "Tom", "Tester", "tom@test.com");
String topic = connectorConfig.getDbServerName() + ".dbo.customers";
String topic = topic("dbo.customers");
awaitAssert(() -> assertions.assertRecordsCount(topic, 5));
awaitAssert(() -> assertions.assertRecordsContain(topic, "tom@test.com"));
}
@ -102,9 +107,8 @@ public void shouldStreamChanges(SqlDatabaseController dbController) throws SQLEx
public void shouldRerouteUpdates(SqlDatabaseController dbController) throws SQLException {
renameCustomer(dbController, "Tom", "Thomas");
String prefix = connectorConfig.getDbServerName();
String updatesTopic = prefix + ".u.customers";
awaitAssert(() -> assertions.assertRecordsCount(prefix + ".dbo.customers", 5));
String updatesTopic = topic("u.customers");
awaitAssert(() -> assertions.assertRecordsCount(topic("dbo.customers"), 5));
awaitAssert(() -> assertions.assertRecordsCount(updatesTopic, 1));
awaitAssert(() -> assertions.assertRecordsContain(updatesTopic, "Thomas"));
}
@ -115,7 +119,7 @@ public void shouldBeDown(SqlDatabaseController dbController) throws Exception {
connectController.undeployConnector(connectorConfig.getConnectorName());
insertCustomer(dbController, "Jerry", "Tester", "jerry@test.com");
String topic = connectorConfig.getDbServerName() + ".dbo.customers";
String topic = topic("dbo.customers");
awaitAssert(() -> assertions.assertRecordsCount(topic, 5));
}
@ -124,7 +128,7 @@ public void shouldBeDown(SqlDatabaseController dbController) throws Exception {
public void shouldResumeStreamingAfterRedeployment() throws Exception {
connectController.deployConnector(connectorConfig);
String topic = connectorConfig.getDbServerName() + ".dbo.customers";
String topic = topic("dbo.customers");
awaitAssert(() -> assertions.assertRecordsCount(topic, 6));
awaitAssert(() -> assertions.assertRecordsContain(topic, "jerry@test.com"));
}
@ -135,7 +139,7 @@ public void shouldBeDownAfterCrash(SqlDatabaseController dbController) throws SQ
connectController.destroy();
insertCustomer(dbController, "Nibbles", "Tester", "nibbles@test.com");
String topic = connectorConfig.getDbServerName() + ".dbo.customers";
String topic = topic("dbo.customers");
awaitAssert(() -> assertions.assertRecordsCount(topic, 6));
}
@ -144,7 +148,7 @@ public void shouldBeDownAfterCrash(SqlDatabaseController dbController) throws SQ
public void shouldResumeStreamingAfterCrash() throws InterruptedException {
connectController.restore();
String topic = connectorConfig.getDbServerName() + ".dbo.customers";
String topic = topic("dbo.customers");
awaitAssert(() -> assertions.assertMinimalRecordsCount(topic, 7));
awaitAssert(() -> assertions.assertRecordsContain(topic, "nibbles@test.com"));
}