DBZ-776 Removing redundant method;

Adjusting test for database property pass-through, as the connection isn't established any longer synchronously within start().
This commit is contained in:
Gunnar Morling 2018-11-16 12:02:07 +01:00 committed by Jiri Pechanec
parent 1f4f36872c
commit 770644b9ec
3 changed files with 14 additions and 50 deletions

View File

@ -5,15 +5,9 @@
*/ */
package io.debezium.connector.sqlserver; package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.relational.HistorizedRelationalDatabaseSchema; import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
@ -29,15 +23,11 @@
* Logical representation of SQL Server schema. * Logical representation of SQL Server schema.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*
*/ */
public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema { public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class); private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class);
@ThreadSafe
private final Set<TableId> capturedTables;
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, SqlServerConnection connection) { public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, SqlServerConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null, super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
new TableSchemaBuilder( new TableSchemaBuilder(
@ -46,12 +36,6 @@ public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SchemaN
SourceInfo.SCHEMA SourceInfo.SCHEMA
), ),
false); false);
try {
this.capturedTables = Collections.unmodifiableSet(determineCapturedTables(connectorConfig, connection));
}
catch (SQLException e) {
throw new IllegalStateException("Could not obtain the list of captured tables", e);
}
} }
@Override @Override
@ -72,27 +56,6 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
record(schemaChange, tableChanges); record(schemaChange, tableChanges);
} }
public Set<TableId> getCapturedTables() {
return capturedTables;
}
private static Set<TableId> determineCapturedTables(SqlServerConnectorConfig connectorConfig, SqlServerConnection connection) throws SQLException {
final Set<TableId> allTableIds = connection.readTableNames(connectorConfig.getDatabaseName(), null, null, new String[] {"TABLE"} );
final Set<TableId> capturedTables = new HashSet<>();
for (TableId tableId : allTableIds) {
if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
capturedTables.add(tableId);
}
else {
LOGGER.trace("Skipping table {} as it's not included in the filter configuration", tableId);
}
}
return capturedTables;
}
@Override @Override
protected DdlParser getDdlParser() { protected DdlParser getDdlParser() {
return null; return null;

View File

@ -58,7 +58,7 @@ public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorCon
public void execute(ChangeEventSourceContext context) throws InterruptedException { public void execute(ChangeEventSourceContext context) throws InterruptedException {
final Metronome metronome = Metronome.sleeper(pollInterval, clock); final Metronome metronome = Metronome.sleeper(pollInterval, clock);
try { try {
final TableId[] tables = schema.getCapturedTables().toArray(new TableId[schema.getCapturedTables().size()]); final TableId[] tables = schema.tableIds().toArray(new TableId[0]);
final Lsn lastProcessedLsnOnStart = offsetContext.getChangeLsn(); final Lsn lastProcessedLsnOnStart = offsetContext.getChangeLsn();
LOGGER.info("Last LSN recorded in offsets is {}", lastProcessedLsnOnStart); LOGGER.info("Last LSN recorded in offsets is {}", lastProcessedLsnOnStart);
Lsn lastProcessedLsn = offsetContext.getChangeLsn(); Lsn lastProcessedLsn = offsetContext.getChangeLsn();

View File

@ -20,8 +20,6 @@
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Throwables;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode; import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper; import io.debezium.connector.sqlserver.util.TestHelper;
@ -323,25 +321,28 @@ public void blacklistTable() throws Exception {
} }
/** /**
* If integratedSecurity and authenticationScheme are propagated to the driver as expected, an exception * Passing the "applicationName" property which can be asserted from the connected sessions".
* will be raised due to the lack of Kerberos infrastructure
*/ */
@Test @Test
@FixFor("DBZ-964") @FixFor("DBZ-964")
public void shouldPropagateDatabaseDriverProperties() throws Exception { public void shouldPropagateDatabaseDriverProperties() throws Exception {
final Configuration config = TestHelper.defaultConfig() final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.with("database.integratedSecurity", "true") .with("database.applicationName", "Debezium App DBZ-964")
.with("database.authenticationScheme", "JavaKerberos")
.build(); .build();
start(SqlServerConnector.class, config, (success, msg, error) -> { start(SqlServerConnector.class, config);
assertThat(success).isFalse(); assertConnectorIsRunning();
assertThat(Throwables.getRootCause(error))
.hasMessage("Invalid name provided (Mechanism level: KrbException: Cannot locate default realm)");
});
assertConnectorNotRunning(); // consuming one record to make sure the connector establishes the DB connection which happens asynchronously
// after the start() call
connection.execute("INSERT INTO tablea VALUES(964, 'a')");
consumeRecordsByTopic(1);
connection.query("select count(1) from sys.dm_exec_sessions where program_name = 'Debezium App DBZ-964'", rs -> {
rs.next();
assertThat(rs.getInt(1)).isEqualTo(1);
});
} }
private void assertRecord(Struct record, List<SchemaAndValueField> expected) { private void assertRecord(Struct record, List<SchemaAndValueField> expected) {