DBZ-812 Moving test-only methods to TestHelper

This commit is contained in:
Gunnar Morling 2018-11-15 14:45:34 +01:00 committed by Jiri Pechanec
parent fdeaa1cff8
commit 050c192dd7
11 changed files with 123 additions and 117 deletions

View File

@ -7,14 +7,14 @@
import java.time.Instant;
import io.debezium.annotation.NotThreadSafe;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.AbstractSourceInfo;
/**
* Coordinates from the database log to establis relation between the change streamed and the source log position.
* Coordinates from the database log to establish the relation between the change streamed and the source log position.
* Maps to {@code source} field in {@code Envelope}.
*
* @author Jiri Pechanec

View File

@ -15,7 +15,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@ -31,7 +30,6 @@
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.IoUtil;
/**
* {@link JdbcConnection} extension to be used with Microsoft SQL Server
@ -44,15 +42,6 @@ public class SqlServerConnection extends JdbcConnection {
private static Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class);
private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=0)\n"
+ "EXEC sys.sp_cdc_enable_db";
private static final String DISABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=1)\n"
+ "EXEC sys.sp_cdc_disable_db";
private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n"
+ "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
private static final String ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE = "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'%s', @capture_instance = N'%s', @role_name = NULL, @supports_net_changes = 0";
private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
private static final String CDC_WRAPPERS_DML;
private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
private static final String LOCK_TABLE = "SELECT * FROM # WITH (TABLOCKX)";
private static final String LSN_TO_TIMESTAMP = "SELECT sys.fn_cdc_map_lsn_to_time(?)";
@ -69,16 +58,6 @@ public class SqlServerConnection extends JdbcConnection {
SQLServerDriver.class.getName(),
SqlServerConnection.class.getClassLoader());
static {
try {
ClassLoader classLoader = SqlServerConnection.class.getClassLoader();
CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql"));
}
catch (Exception e) {
throw new RuntimeException("Cannot load SQL Server statements", e);
}
}
private static interface ResultSetExtractor<T> {
T apply(ResultSet rs) throws SQLException;
}
@ -93,74 +72,7 @@ public SqlServerConnection(Configuration config) {
super(config, FACTORY);
}
/**
* Enables CDC for a given database, if not already enabled.
*
* @param name
* the name of the DB, may not be {@code null}
* @throws SQLException
* if anything unexpected fails
*/
public void enableDbCdc(String name) throws SQLException {
Objects.requireNonNull(name);
execute(ENABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name));
}
/**
* Disables CDC for a given database, if not already disabled.
*
* @param name
* the name of the DB, may not be {@code null}
* @throws SQLException
* if anything unexpected fails
*/
public void disableDbCdc(String name) throws SQLException {
Objects.requireNonNull(name);
execute(DISABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name));
}
/**
* Enables CDC for a table if not already enabled and generates the wrapper
* functions for that table.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public void enableTableCdc(String name) throws SQLException {
Objects.requireNonNull(name);
String enableCdcForTableStmt = ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
String generateWrapperFunctionsStmts = CDC_WRAPPERS_DML.replaceAll(STATEMENTS_PLACEHOLDER, name);
execute(enableCdcForTableStmt, generateWrapperFunctionsStmts);
}
/**
* Enables CDC for a table with a custom capture name
* functions for that table.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public void enableTableCdc(String tableName, String captureName) throws SQLException {
Objects.requireNonNull(tableName);
Objects.requireNonNull(captureName);
String enableCdcForTableStmt = String.format(ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE, tableName, captureName);
execute(enableCdcForTableStmt);
}
/**
* Disables CDC for a table for which it was enabled before.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public void disableTableCdc(String name) throws SQLException {
Objects.requireNonNull(name);
String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
execute(disableCdcForTableStmt);
}
/**
* @return the current largest log sequence number

View File

@ -197,7 +197,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
});
lastProcessedLsn = currentMaxLsn;
// Terminate the transaction otherwise CDC could not be disabled for tables
// Terminate the transaction otherwise CDC could not be disabled for tables
connection.rollback();
}
catch (SQLException e) {
@ -326,7 +326,7 @@ public boolean next() throws SQLException {
completed = !resultSet.next();
currentChangeLsn = completed ? Lsn.NULL : Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN));
if (completed) {
LOGGER.trace("Closing result set of change table for table {}", changeTable);
LOGGER.trace("Closing result set of change tables for table {}", changeTable);
resultSet.close();
}
return !completed;

View File

@ -160,7 +160,7 @@ public static void createTables() throws SQLException {
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute(ALL_DDLS);
for (String table: ALL_TABLES) {
connection.enableTableCdc(table);
TestHelper.enableTableCdc(connection, table);
}
connection.execute(
"INSERT INTO type_int VALUES (0, 1, 22, 333, 4444, 55555)",

View File

@ -51,10 +51,10 @@ public void before() throws SQLException {
"CREATE TABLE tablenumb (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
"CREATE TABLE tablenumc (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
"CREATE TABLE tablenumd (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)");
connection.enableTableCdc("tablea");
connection.enableTableCdc("tableb");
connection.enableTableCdc("tablec");
connection.enableTableCdc("tabled");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.enableTableCdc(connection, "tableb");
TestHelper.enableTableCdc(connection, "tablec");
TestHelper.enableTableCdc(connection, "tabled");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);

View File

@ -14,7 +14,6 @@
import java.util.List;
import java.util.Map;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -26,6 +25,7 @@
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
@ -60,7 +60,7 @@ public void before() throws SQLException {
);
}
connection.enableTableCdc("table1");
TestHelper.enableTableCdc(connection, "table1");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);

View File

@ -39,8 +39,8 @@ public void before() throws SQLException {
"CREATE TABLE tableb (id int primary key, colb varchar(30))",
"CREATE TABLE tablec (id int primary key, colc varchar(30))"
);
connection.enableTableCdc("tablea");
connection.enableTableCdc("tableb");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.enableTableCdc(connection, "tableb");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
@ -80,13 +80,13 @@ public void addTable() throws Exception {
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
// Enable CDC for already existing table
connection.enableTableCdc("tablec");
TestHelper.enableTableCdc(connection, "tablec");
// CDC for newly added table
connection.execute(
"CREATE TABLE tabled (id int primary key, cold varchar(30))"
);
connection.enableTableCdc("tabled");
TestHelper.enableTableCdc(connection, "tabled");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
@ -152,7 +152,7 @@ public void removeTable() throws Exception {
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
// Disable CDC for a table
connection.disableTableCdc("tableb");
TestHelper.disableTableCdc(connection, "tableb");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;
@ -245,7 +245,7 @@ private void addColumnToTable(boolean pauseAfterCaptureChange) throws Exception
);
});
connection.enableTableCdc("tableb", "after_change");
TestHelper.enableTableCdc(connection, "tableb", "after_change");
if (pauseAfterCaptureChange) {
Thread.sleep(5_000);
}
@ -343,7 +343,7 @@ public void removeColumnFromTable() throws Exception {
// Enable a second capture instance
connection.execute("ALTER TABLE dbo.tableb DROP COLUMN colb");
connection.enableTableCdc("tableb", "after_change");
TestHelper.enableTableCdc(connection, "tableb", "after_change");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;
@ -433,10 +433,10 @@ public void renameColumn() throws Exception {
});
// CDC must be disabled, otherwise rename fails
connection.disableTableCdc("tableb");
TestHelper.disableTableCdc(connection, "tableb");
// Enable a second capture instance
connection.execute("exec sp_rename 'tableb.colb', 'newcolb';");
connection.enableTableCdc("tableb", "after_change");
TestHelper.enableTableCdc(connection, "tableb", "after_change");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;
@ -533,7 +533,7 @@ public void changeColumn() throws Exception {
// Enable a second capture instance
connection.execute("ALTER TABLE dbo.tableb ALTER COLUMN colb INT");
connection.enableTableCdc("tableb", "after_change");
TestHelper.enableTableCdc(connection, "tableb", "after_change");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;

View File

@ -34,7 +34,7 @@ public void shouldEnableCdcForDatabase() throws Exception {
connection.execute("CREATE DATABASE testDB");
connection.execute("USE testDB");
// NOTE: you cannot enable CDC on master
connection.enableDbCdc("testDB");
TestHelper.enableDbCdc(connection, "testDB");
}
}
@ -45,7 +45,7 @@ public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception {
connection.execute("CREATE DATABASE testDB");
connection.execute("USE testDB");
// NOTE: you cannot enable CDC on master
connection.enableDbCdc("testDB");
TestHelper.enableDbCdc(connection, "testDB");
// create table if exists
String sql = "IF EXISTS (select 1 from sys.objects where name = 'testTable' and type = 'u')\n"
@ -54,7 +54,7 @@ public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception {
connection.execute(sql);
// then enable CDC and wrapper functions
connection.enableTableCdc("testTable");
TestHelper.enableTableCdc(connection, "testTable");
// insert some data
connection.execute("INSERT INTO testTable (NUMBER, TEXT) values (1, 'aaa')\n"

View File

@ -46,8 +46,8 @@ public void before() throws SQLException {
"CREATE TABLE tableb (id int primary key, colb varchar(30))",
"INSERT INTO tablea VALUES(1, 'a')"
);
connection.enableTableCdc("tablea");
connection.enableTableCdc("tableb");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.enableTableCdc(connection, "tableb");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);

View File

@ -8,21 +8,46 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Objects;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
/**
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class TestHelper {
public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
public static final String TEST_DATABASE = "testDB";
private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=0)\n"
+ "EXEC sys.sp_cdc_enable_db";
private static final String DISABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=1)\n"
+ "EXEC sys.sp_cdc_disable_db";
private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n"
+ "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
private static final String ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE = "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'%s', @capture_instance = N'%s', @role_name = NULL, @supports_net_changes = 0";
private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
private static final String CDC_WRAPPERS_DML;
static {
try {
ClassLoader classLoader = TestHelper.class.getClassLoader();
CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql"));
}
catch (Exception e) {
throw new RuntimeException("Cannot load SQL Server statements", e);
}
}
public static JdbcConfiguration adminJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDefault(JdbcConfiguration.DATABASE, "master")
@ -67,7 +92,7 @@ public static void createTestDatabase() {
connection.connect();
try {
connection.execute("USE testDB");
connection.disableDbCdc("testDB");
disableDbCdc(connection, "testDB");
}
catch (SQLException e) {
}
@ -78,7 +103,7 @@ public static void createTestDatabase() {
connection.execute("USE testDB");
connection.execute("ALTER DATABASE testDB SET ALLOW_SNAPSHOT_ISOLATION ON");
// NOTE: you cannot enable CDC on master
connection.enableDbCdc("testDB");
enableDbCdc(connection, "testDB");
}
catch (SQLException e) {
throw new IllegalStateException("Error while initiating test database", e);
@ -90,7 +115,7 @@ public static void dropTestDatabase() {
connection.connect();
try {
connection.execute("USE testDB");
connection.disableDbCdc("testDB");
disableDbCdc(connection, "testDB");
}
catch (SQLException e) {
}
@ -110,4 +135,73 @@ public static SqlServerConnection adminConnection() {
public static SqlServerConnection testConnection() {
return new SqlServerConnection(TestHelper.defaultJdbcConfig());
}
/**
* Enables CDC for a given database, if not already enabled.
*
* @param name
* the name of the DB, may not be {@code null}
* @throws SQLException
* if anything unexpected fails
*/
public static void enableDbCdc(SqlServerConnection connection, String name) throws SQLException {
Objects.requireNonNull(name);
connection.execute(ENABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name));
}
/**
* Disables CDC for a given database, if not already disabled.
*
* @param name
* the name of the DB, may not be {@code null}
* @throws SQLException
* if anything unexpected fails
*/
protected static void disableDbCdc(SqlServerConnection connection, String name) throws SQLException {
Objects.requireNonNull(name);
connection.execute(DISABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name));
}
/**
* Enables CDC for a table if not already enabled and generates the wrapper
* functions for that table.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public static void enableTableCdc(SqlServerConnection connection, String name) throws SQLException {
Objects.requireNonNull(name);
String enableCdcForTableStmt = ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
String generateWrapperFunctionsStmts = CDC_WRAPPERS_DML.replaceAll(STATEMENTS_PLACEHOLDER, name);
connection.execute(enableCdcForTableStmt, generateWrapperFunctionsStmts);
}
/**
* Enables CDC for a table with a custom capture name
* functions for that table.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public static void enableTableCdc(SqlServerConnection connection, String tableName, String captureName) throws SQLException {
Objects.requireNonNull(tableName);
Objects.requireNonNull(captureName);
String enableCdcForTableStmt = String.format(ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE, tableName, captureName);
connection.execute(enableCdcForTableStmt);
}
/**
* Disables CDC for a table for which it was enabled before.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public static void disableTableCdc(SqlServerConnection connection, String name) throws SQLException {
Objects.requireNonNull(name);
String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
connection.execute(disableCdcForTableStmt);
}
}