From 770644b9ecd1475e112b42c2e918be413e5bf19e Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Fri, 16 Nov 2018 12:02:07 +0100 Subject: [PATCH] DBZ-776 Removing redundant method; Adjusting test for database property pass-through, as the connection isn't established any longer synchronously within start(). --- .../sqlserver/SqlServerDatabaseSchema.java | 37 ------------------- .../SqlServerStreamingChangeEventSource.java | 2 +- .../sqlserver/SqlServerConnectorIT.java | 25 +++++++------ 3 files changed, 14 insertions(+), 50 deletions(-) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java index 21c67523e..3290458ca 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerDatabaseSchema.java @@ -5,15 +5,9 @@ */ package io.debezium.connector.sqlserver; -import java.sql.SQLException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.annotation.ThreadSafe; import io.debezium.relational.HistorizedRelationalDatabaseSchema; import io.debezium.relational.Table; import io.debezium.relational.TableId; @@ -29,15 +23,11 @@ * Logical representation of SQL Server schema. * * @author Jiri Pechanec - * */ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema { private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class); - @ThreadSafe - private final Set capturedTables; - public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector topicSelector, SqlServerConnection connection) { super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, new TableSchemaBuilder( @@ -46,12 +36,6 @@ public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaN SourceInfo.SCHEMA ), false); - try { - this.capturedTables = Collections.unmodifiableSet(determineCapturedTables(connectorConfig, connection)); - } - catch (SQLException e) { - throw new IllegalStateException("Could not obtain the list of captured tables", e); - } } @Override @@ -72,27 +56,6 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) { record(schemaChange, tableChanges); } - public Set getCapturedTables() { - return capturedTables; - } - - private static Set determineCapturedTables(SqlServerConnectorConfig connectorConfig, SqlServerConnection connection) throws SQLException { - final Set allTableIds = connection.readTableNames(connectorConfig.getDatabaseName(), null, null, new String[] {"TABLE"} ); - - final Set capturedTables = new HashSet<>(); - - for (TableId tableId : allTableIds) { - if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) { - capturedTables.add(tableId); - } - else { - LOGGER.trace("Skipping table {} as it's not included in the filter configuration", tableId); - } - } - - return capturedTables; - } - @Override protected DdlParser getDdlParser() { return null; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index 2214db102..591c07fde 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -58,7 +58,7 @@ public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorCon public void execute(ChangeEventSourceContext context) throws InterruptedException { final Metronome metronome = Metronome.sleeper(pollInterval, clock); try { - final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]); + final TableId[] tables = schema.tableIds().toArray(new TableId[0]); final Lsn lastProcessedLsnOnStart = offsetContext.getChangeLsn(); LOGGER.info("Last LSN recorded in offsets is {}", lastProcessedLsnOnStart); Lsn lastProcessedLsn = offsetContext.getChangeLsn(); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 6a64a6c33..e99b91173 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -20,8 +20,6 @@ import org.junit.Before; import org.junit.Test; -import com.google.common.base.Throwables; - import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.util.TestHelper; @@ -323,25 +321,28 @@ public void blacklistTable() throws Exception { } /** - * If integratedSecurity and authenticationScheme are propagated to the driver as expected, an exception - * will be raised due to the lack of Kerberos infrastructure + * Passing the "applicationName" property which can be asserted from the connected sessions". */ @Test @FixFor("DBZ-964") public void shouldPropagateDatabaseDriverProperties() throws Exception { final Configuration config = TestHelper.defaultConfig() .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) - .with("database.integratedSecurity", "true") - .with("database.authenticationScheme", "JavaKerberos") + .with("database.applicationName", "Debezium App DBZ-964") .build(); - start(SqlServerConnector.class, config, (success, msg, error) -> { - assertThat(success).isFalse(); - assertThat(Throwables.getRootCause(error)) - .hasMessage("Invalid name provided (Mechanism level: KrbException: Cannot locate default realm)"); - }); + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); - assertConnectorNotRunning(); + // consuming one record to make sure the connector establishes the DB connection which happens asynchronously + // after the start() call + connection.execute("INSERT INTO tablea VALUES(964, 'a')"); + consumeRecordsByTopic(1); + + connection.query("select count(1) from sys.dm_exec_sessions where program_name = 'Debezium App DBZ-964'", rs -> { + rs.next(); + assertThat(rs.getInt(1)).isEqualTo(1); + }); } private void assertRecord(Struct record, List expected) {