From 050c192dd73aa0703ef4395c208e3026d189b4d1 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Thu, 15 Nov 2018 14:45:34 +0100 Subject: [PATCH] DBZ-812 Moving test-only methods to TestHelper --- .../connector/sqlserver/SourceInfo.java | 4 +- .../sqlserver/SqlServerConnection.java | 88 --------------- .../SqlServerStreamingChangeEventSource.java | 4 +- .../AbstractSqlServerDatatypesTest.java | 2 +- .../sqlserver/SQLServerNumericColumnIT.java | 8 +- .../connector/sqlserver/SnapshotIT.java | 4 +- .../sqlserver/SqlServerChangeTableSetIT.java | 20 ++-- .../sqlserver/SqlServerConnectionIT.java | 6 +- .../sqlserver/SqlServerConnectorIT.java | 4 +- .../connector/sqlserver/util/TestHelper.java | 100 +++++++++++++++++- .../resources/generate_cdc_wrappers.sql | 0 11 files changed, 123 insertions(+), 117 deletions(-) rename debezium-connector-sqlserver/src/{main => test}/resources/generate_cdc_wrappers.sql (100%) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java index 052fea957..66d8060c8 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java @@ -7,14 +7,14 @@ import java.time.Instant; -import io.debezium.annotation.NotThreadSafe; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import io.debezium.annotation.NotThreadSafe; import io.debezium.connector.AbstractSourceInfo; /** - * Coordinates from the database log to establis relation between the change streamed and the source log position. + * Coordinates from the database log to establish the relation between the change streamed and the source log position. * Maps to {@code source} field in {@code Envelope}. * * @author Jiri Pechanec diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index a0efcab9b..161df62d5 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -15,7 +15,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -31,7 +30,6 @@ import io.debezium.relational.ColumnEditor; import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.util.IoUtil; /** * {@link JdbcConnection} extension to be used with Microsoft SQL Server @@ -44,15 +42,6 @@ public class SqlServerConnection extends JdbcConnection { private static Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class); private static final String STATEMENTS_PLACEHOLDER = "#"; - private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=0)\n" - + "EXEC sys.sp_cdc_enable_db"; - private static final String DISABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=1)\n" - + "EXEC sys.sp_cdc_disable_db"; - private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n" - + "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0"; - private static final String ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE = "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'%s', @capture_instance = N'%s', @role_name = NULL, @supports_net_changes = 0"; - private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'"; - private static final String CDC_WRAPPERS_DML; private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()"; private static final String LOCK_TABLE = "SELECT * FROM # WITH (TABLOCKX)"; private static final String LSN_TO_TIMESTAMP = "SELECT sys.fn_cdc_map_lsn_to_time(?)"; @@ -69,16 +58,6 @@ public class SqlServerConnection extends JdbcConnection { SQLServerDriver.class.getName(), SqlServerConnection.class.getClassLoader()); - static { - try { - ClassLoader classLoader = SqlServerConnection.class.getClassLoader(); - CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql")); - } - catch (Exception e) { - throw new RuntimeException("Cannot load SQL Server statements", e); - } - } - private static interface ResultSetExtractor { T apply(ResultSet rs) throws SQLException; } @@ -93,74 +72,7 @@ public SqlServerConnection(Configuration config) { super(config, FACTORY); } - /** - * Enables CDC for a given database, if not already enabled. - * - * @param name - * the name of the DB, may not be {@code null} - * @throws SQLException - * if anything unexpected fails - */ - public void enableDbCdc(String name) throws SQLException { - Objects.requireNonNull(name); - execute(ENABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name)); - } - /** - * Disables CDC for a given database, if not already disabled. - * - * @param name - * the name of the DB, may not be {@code null} - * @throws SQLException - * if anything unexpected fails - */ - public void disableDbCdc(String name) throws SQLException { - Objects.requireNonNull(name); - execute(DISABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name)); - } - - /** - * Enables CDC for a table if not already enabled and generates the wrapper - * functions for that table. - * - * @param name - * the name of the table, may not be {@code null} - * @throws SQLException if anything unexpected fails - */ - public void enableTableCdc(String name) throws SQLException { - Objects.requireNonNull(name); - String enableCdcForTableStmt = ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name); - String generateWrapperFunctionsStmts = CDC_WRAPPERS_DML.replaceAll(STATEMENTS_PLACEHOLDER, name); - execute(enableCdcForTableStmt, generateWrapperFunctionsStmts); - } - - /** - * Enables CDC for a table with a custom capture name - * functions for that table. - * - * @param name - * the name of the table, may not be {@code null} - * @throws SQLException if anything unexpected fails - */ - public void enableTableCdc(String tableName, String captureName) throws SQLException { - Objects.requireNonNull(tableName); - Objects.requireNonNull(captureName); - String enableCdcForTableStmt = String.format(ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE, tableName, captureName); - execute(enableCdcForTableStmt); - } - - /** - * Disables CDC for a table for which it was enabled before. - * - * @param name - * the name of the table, may not be {@code null} - * @throws SQLException if anything unexpected fails - */ - public void disableTableCdc(String name) throws SQLException { - Objects.requireNonNull(name); - String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name); - execute(disableCdcForTableStmt); - } /** * @return the current largest log sequence number diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 0065fda63..0bb9137ed 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -197,7 +197,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio } }); lastProcessedLsn = currentMaxLsn; - // Terminate the transaction otherwise CDC could not be disabled for tables + // Terminate the transaction otherwise CDC could not be disabled for tables connection.rollback(); } catch (SQLException e) { @@ -326,7 +326,7 @@ public boolean next() throws SQLException { completed = !resultSet.next(); currentChangeLsn = completed ? Lsn.NULL : Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)); if (completed) { - LOGGER.trace("Closing result set of change table for table {}", changeTable); + LOGGER.trace("Closing result set of change tables for table {}", changeTable); resultSet.close(); } return !completed; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java index 01f8c471b..47a913064 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/AbstractSqlServerDatatypesTest.java @@ -160,7 +160,7 @@ public static void createTables() throws SQLException { try (SqlServerConnection connection = TestHelper.testConnection()) { connection.execute(ALL_DDLS); for (String table: ALL_TABLES) { - connection.enableTableCdc(table); + TestHelper.enableTableCdc(connection, table); } connection.execute( "INSERT INTO type_int VALUES (0, 1, 22, 333, 4444, 55555)", diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java index df719ad6b..afab2012e 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SQLServerNumericColumnIT.java @@ -51,10 +51,10 @@ public void before() throws SQLException { "CREATE TABLE tablenumb (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)", "CREATE TABLE tablenumc (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)", "CREATE TABLE tablenumd (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)"); - connection.enableTableCdc("tablea"); - connection.enableTableCdc("tableb"); - connection.enableTableCdc("tablec"); - connection.enableTableCdc("tabled"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.enableTableCdc(connection, "tableb"); + TestHelper.enableTableCdc(connection, "tablec"); + TestHelper.enableTableCdc(connection, "tabled"); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index ff99c266f..98da7e7b5 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; -import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -26,6 +25,7 @@ import org.junit.Test; import io.debezium.config.Configuration; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotLockingMode; import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.util.TestHelper; import io.debezium.data.SchemaAndValueField; @@ -60,7 +60,7 @@ public void before() throws SQLException { ); } - connection.enableTableCdc("table1"); + TestHelper.enableTableCdc(connection, "table1"); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java index 3c3ef4e23..ecc946c75 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java @@ -39,8 +39,8 @@ public void before() throws SQLException { "CREATE TABLE tableb (id int primary key, colb varchar(30))", "CREATE TABLE tablec (id int primary key, colc varchar(30))" ); - connection.enableTableCdc("tablea"); - connection.enableTableCdc("tableb"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.enableTableCdc(connection, "tableb"); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); @@ -80,13 +80,13 @@ public void addTable() throws Exception { Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE); // Enable CDC for already existing table - connection.enableTableCdc("tablec"); + TestHelper.enableTableCdc(connection, "tablec"); // CDC for newly added table connection.execute( "CREATE TABLE tabled (id int primary key, cold varchar(30))" ); - connection.enableTableCdc("tabled"); + TestHelper.enableTableCdc(connection, "tabled"); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START + i; @@ -152,7 +152,7 @@ public void removeTable() throws Exception { Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE); // Disable CDC for a table - connection.disableTableCdc("tableb"); + TestHelper.disableTableCdc(connection, "tableb"); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START_2 + i; @@ -245,7 +245,7 @@ private void addColumnToTable(boolean pauseAfterCaptureChange) throws Exception ); }); - connection.enableTableCdc("tableb", "after_change"); + TestHelper.enableTableCdc(connection, "tableb", "after_change"); if (pauseAfterCaptureChange) { Thread.sleep(5_000); } @@ -343,7 +343,7 @@ public void removeColumnFromTable() throws Exception { // Enable a second capture instance connection.execute("ALTER TABLE dbo.tableb DROP COLUMN colb"); - connection.enableTableCdc("tableb", "after_change"); + TestHelper.enableTableCdc(connection, "tableb", "after_change"); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START_2 + i; @@ -433,10 +433,10 @@ public void renameColumn() throws Exception { }); // CDC must be disabled, otherwise rename fails - connection.disableTableCdc("tableb"); + TestHelper.disableTableCdc(connection, "tableb"); // Enable a second capture instance connection.execute("exec sp_rename 'tableb.colb', 'newcolb';"); - connection.enableTableCdc("tableb", "after_change"); + TestHelper.enableTableCdc(connection, "tableb", "after_change"); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START_2 + i; @@ -533,7 +533,7 @@ public void changeColumn() throws Exception { // Enable a second capture instance connection.execute("ALTER TABLE dbo.tableb ALTER COLUMN colb INT"); - connection.enableTableCdc("tableb", "after_change"); + TestHelper.enableTableCdc(connection, "tableb", "after_change"); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START_2 + i; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java index 856306603..c341447d4 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectionIT.java @@ -34,7 +34,7 @@ public void shouldEnableCdcForDatabase() throws Exception { connection.execute("CREATE DATABASE testDB"); connection.execute("USE testDB"); // NOTE: you cannot enable CDC on master - connection.enableDbCdc("testDB"); + TestHelper.enableDbCdc(connection, "testDB"); } } @@ -45,7 +45,7 @@ public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception { connection.execute("CREATE DATABASE testDB"); connection.execute("USE testDB"); // NOTE: you cannot enable CDC on master - connection.enableDbCdc("testDB"); + TestHelper.enableDbCdc(connection, "testDB"); // create table if exists String sql = "IF EXISTS (select 1 from sys.objects where name = 'testTable' and type = 'u')\n" @@ -54,7 +54,7 @@ public void shouldEnableCdcWithWrapperFunctionsForTable() throws Exception { connection.execute(sql); // then enable CDC and wrapper functions - connection.enableTableCdc("testTable"); + TestHelper.enableTableCdc(connection, "testTable"); // insert some data connection.execute("INSERT INTO testTable (NUMBER, TEXT) values (1, 'aaa')\n" diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index e99b91173..fb1effd2c 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -46,8 +46,8 @@ public void before() throws SQLException { "CREATE TABLE tableb (id int primary key, colb varchar(30))", "INSERT INTO tablea VALUES(1, 'a')" ); - connection.enableTableCdc("tablea"); - connection.enableTableCdc("tableb"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.enableTableCdc(connection, "tableb"); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.DB_HISTORY_PATH); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index 776ebc239..0793169da 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -8,21 +8,46 @@ import java.nio.file.Path; import java.sql.SQLException; +import java.util.Objects; import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.SqlServerConnection; import io.debezium.connector.sqlserver.SqlServerConnectorConfig; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.relational.history.FileDatabaseHistory; +import io.debezium.util.IoUtil; import io.debezium.util.Testing; /** * @author Horia Chiorean (hchiorea@redhat.com) */ public class TestHelper { + public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath(); public static final String TEST_DATABASE = "testDB"; + private static final String STATEMENTS_PLACEHOLDER = "#"; + + private static final String ENABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=0)\n" + + "EXEC sys.sp_cdc_enable_db"; + private static final String DISABLE_DB_CDC = "IF EXISTS(select 1 from sys.databases where name='#' AND is_cdc_enabled=1)\n" + + "EXEC sys.sp_cdc_disable_db"; + private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n" + + "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0"; + private static final String ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE = "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'%s', @capture_instance = N'%s', @role_name = NULL, @supports_net_changes = 0"; + private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'"; + private static final String CDC_WRAPPERS_DML; + + static { + try { + ClassLoader classLoader = TestHelper.class.getClassLoader(); + CDC_WRAPPERS_DML = IoUtil.read(classLoader.getResourceAsStream("generate_cdc_wrappers.sql")); + } + catch (Exception e) { + throw new RuntimeException("Cannot load SQL Server statements", e); + } + } + public static JdbcConfiguration adminJdbcConfig() { return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) .withDefault(JdbcConfiguration.DATABASE, "master") @@ -67,7 +92,7 @@ public static void createTestDatabase() { connection.connect(); try { connection.execute("USE testDB"); - connection.disableDbCdc("testDB"); + disableDbCdc(connection, "testDB"); } catch (SQLException e) { } @@ -78,7 +103,7 @@ public static void createTestDatabase() { connection.execute("USE testDB"); connection.execute("ALTER DATABASE testDB SET ALLOW_SNAPSHOT_ISOLATION ON"); // NOTE: you cannot enable CDC on master - connection.enableDbCdc("testDB"); + enableDbCdc(connection, "testDB"); } catch (SQLException e) { throw new IllegalStateException("Error while initiating test database", e); @@ -90,7 +115,7 @@ public static void dropTestDatabase() { connection.connect(); try { connection.execute("USE testDB"); - connection.disableDbCdc("testDB"); + disableDbCdc(connection, "testDB"); } catch (SQLException e) { } @@ -110,4 +135,73 @@ public static SqlServerConnection adminConnection() { public static SqlServerConnection testConnection() { return new SqlServerConnection(TestHelper.defaultJdbcConfig()); } + + /** + * Enables CDC for a given database, if not already enabled. + * + * @param name + * the name of the DB, may not be {@code null} + * @throws SQLException + * if anything unexpected fails + */ + public static void enableDbCdc(SqlServerConnection connection, String name) throws SQLException { + Objects.requireNonNull(name); + connection.execute(ENABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name)); + } + + /** + * Disables CDC for a given database, if not already disabled. + * + * @param name + * the name of the DB, may not be {@code null} + * @throws SQLException + * if anything unexpected fails + */ + protected static void disableDbCdc(SqlServerConnection connection, String name) throws SQLException { + Objects.requireNonNull(name); + connection.execute(DISABLE_DB_CDC.replace(STATEMENTS_PLACEHOLDER, name)); + } + + /** + * Enables CDC for a table if not already enabled and generates the wrapper + * functions for that table. + * + * @param name + * the name of the table, may not be {@code null} + * @throws SQLException if anything unexpected fails + */ + public static void enableTableCdc(SqlServerConnection connection, String name) throws SQLException { + Objects.requireNonNull(name); + String enableCdcForTableStmt = ENABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name); + String generateWrapperFunctionsStmts = CDC_WRAPPERS_DML.replaceAll(STATEMENTS_PLACEHOLDER, name); + connection.execute(enableCdcForTableStmt, generateWrapperFunctionsStmts); + } + + /** + * Enables CDC for a table with a custom capture name + * functions for that table. + * + * @param name + * the name of the table, may not be {@code null} + * @throws SQLException if anything unexpected fails + */ + public static void enableTableCdc(SqlServerConnection connection, String tableName, String captureName) throws SQLException { + Objects.requireNonNull(tableName); + Objects.requireNonNull(captureName); + String enableCdcForTableStmt = String.format(ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE, tableName, captureName); + connection.execute(enableCdcForTableStmt); + } + + /** + * Disables CDC for a table for which it was enabled before. + * + * @param name + * the name of the table, may not be {@code null} + * @throws SQLException if anything unexpected fails + */ + public static void disableTableCdc(SqlServerConnection connection, String name) throws SQLException { + Objects.requireNonNull(name); + String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name); + connection.execute(disableCdcForTableStmt); + } } diff --git a/debezium-connector-sqlserver/src/main/resources/generate_cdc_wrappers.sql b/debezium-connector-sqlserver/src/test/resources/generate_cdc_wrappers.sql similarity index 100% rename from debezium-connector-sqlserver/src/main/resources/generate_cdc_wrappers.sql rename to debezium-connector-sqlserver/src/test/resources/generate_cdc_wrappers.sql