DBZ-4125 Add test covering special characters in database name

DBZ-4125 Remove unused import

DBZ-4125 Clean up variable usage
This commit is contained in:
cburch824 2021-10-12 14:58:11 -05:00 committed by Gunnar Morling
parent 766139ab05
commit 1de72deafa
2 changed files with 87 additions and 14 deletions

View File

@ -14,7 +14,6 @@
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions; import org.fest.assertions.Assertions;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
@ -33,15 +32,6 @@ public class SpecialCharsInNamesIT extends AbstractConnectorTest {
private SqlServerConnection connection; private SqlServerConnection connection;
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After @After
public void after() throws SQLException { public void after() throws SQLException {
if (connection != null) { if (connection != null) {
@ -52,6 +42,12 @@ public void after() throws SQLException {
@Test @Test
@FixFor("DBZ-1546") @FixFor("DBZ-1546")
public void shouldParseWhitespaceChars() throws Exception { public void shouldParseWhitespaceChars() throws Exception {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
final Configuration config = TestHelper.defaultConfig() final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo\\.UAT WAG CZ\\$Fixed Asset.*, dbo\\.UAT WAG CZ\\$Fixed Prop.*") .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo\\.UAT WAG CZ\\$Fixed Asset.*, dbo\\.UAT WAG CZ\\$Fixed Prop.*")
@ -116,6 +112,12 @@ public void shouldParseWhitespaceChars() throws Exception {
@Test @Test
@FixFor("DBZ-1153") @FixFor("DBZ-1153")
public void shouldParseSpecialChars() throws Exception { public void shouldParseSpecialChars() throws Exception {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
final Configuration config = TestHelper.defaultConfig() final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo\\.UAT WAG CZ\\$Fixed Asset.*") .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo\\.UAT WAG CZ\\$Fixed Asset.*")
@ -243,4 +245,32 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
.build()); .build());
Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(4); Assertions.assertThat(((Struct) record.value()).getStruct("after").getInt32("id")).isEqualTo(4);
} }
@Test
@FixFor("DBZ-4125")
public void shouldHandleSpecialCharactersInDatabaseNames() throws Exception {
final String databaseName = "test-db";
TestHelper.createTestDatabase(databaseName);
connection = TestHelper.testConnection(databaseName);
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.DATABASE_NAME, databaseName)
.with(SqlServerConnectorConfig.SANITIZE_FIELD_NAMES, false)
.build();
connection.execute(
"CREATE TABLE tablea (id int primary key, cola varchar(30))",
"INSERT INTO tablea VALUES(1, 'a')");
TestHelper.enableTableCdc(connection, "tablea");
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
TestHelper.waitForMaxLsnAvailable(connection, databaseName);
}
} }

View File

@ -147,12 +147,31 @@ public static void createTestDatabase() {
try (SqlServerConnection connection = adminConnection()) { try (SqlServerConnection connection = adminConnection()) {
connection.connect(); connection.connect();
dropTestDatabase(connection); dropTestDatabase(connection);
String sql = "CREATE DATABASE testDB\n"; String sql = String.format("CREATE DATABASE %s\n", TEST_DATABASE);
connection.execute(sql); connection.execute(sql);
connection.execute("USE testDB"); connection.execute(String.format("USE %s", TEST_DATABASE));
connection.execute("ALTER DATABASE testDB SET ALLOW_SNAPSHOT_ISOLATION ON"); connection.execute(String.format("ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON", TEST_DATABASE));
// NOTE: you cannot enable CDC on master // NOTE: you cannot enable CDC on master
enableDbCdc(connection, "testDB"); enableDbCdc(connection, TEST_DATABASE);
}
catch (SQLException e) {
LOGGER.error("Error while initiating test database", e);
throw new IllegalStateException("Error while initiating test database", e);
}
}
public static void createTestDatabase(String databaseName) {
// NOTE: you cannot enable CDC for the "master" db (the default one) so
// all tests must use a separate database...
try (SqlServerConnection connection = adminConnection()) {
connection.connect();
dropTestDatabase(connection);
String sql = String.format("CREATE DATABASE [%s]\n", databaseName);
connection.execute(sql);
connection.execute(String.format("USE [%s]", databaseName));
connection.execute(String.format("ALTER DATABASE [%s] SET ALLOW_SNAPSHOT_ISOLATION ON", databaseName));
// NOTE: you cannot enable CDC on master
enableDbCdc(connection, databaseName);
} }
catch (SQLException e) { catch (SQLException e) {
LOGGER.error("Error while initiating test database", e); LOGGER.error("Error while initiating test database", e);
@ -236,6 +255,17 @@ public static SqlServerConnection testConnection() {
Collections.emptySet(), true); Collections.emptySet(), true);
} }
public static SqlServerConnection testConnection(String databaseName) {
Configuration config = defaultJdbcConfig()
.edit()
.with(JdbcConfiguration.ON_CONNECT_STATEMENTS, "USE [" + databaseName + "]")
.build();
return new SqlServerConnection(config, SourceTimestampMode.getDefaultMode(),
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), () -> TestHelper.class.getClassLoader(),
Collections.emptySet(), true);
}
/** /**
* Enables CDC for a given database, if not already enabled. * Enables CDC for a given database, if not already enabled.
* *
@ -410,6 +440,19 @@ public static void waitForMaxLsnAvailable(SqlServerConnection connection) throws
} }
} }
public static void waitForMaxLsnAvailable(SqlServerConnection connection, String databaseName) throws Exception {
try {
Awaitility.await("Max LSN not available")
.atMost(60, TimeUnit.SECONDS)
.pollDelay(Duration.ofSeconds(0))
.pollInterval(Duration.ofMillis(100))
.until(() -> connection.getMaxLsn(databaseName).isAvailable());
}
catch (ConditionTimeoutException e) {
throw new IllegalArgumentException("A max LSN was not available", e);
}
}
private static ObjectName getObjectName(String context, String serverName) throws MalformedObjectNameException { private static ObjectName getObjectName(String context, String serverName) throws MalformedObjectNameException {
return new ObjectName("debezium.sql_server:type=connector-metrics,context=" + context + ",server=" + serverName); return new ObjectName("debezium.sql_server:type=connector-metrics,context=" + context + ",server=" + serverName);
} }