DBZ-1051 Handle object and db name letter case

This commit is contained in:
Jiri Pechanec 2019-01-15 19:28:02 +01:00 committed by Gunnar Morling
parent 51468556a2
commit 16bc8ac490
4 changed files with 164 additions and 5 deletions

View File

@ -40,6 +40,8 @@
*/
public class SqlServerConnection extends JdbcConnection {
private static final String GET_DATABASE_NAME = "SELECT db_name()";
private static Logger LOGGER = LoggerFactory.getLogger(SqlServerConnection.class);
private static final String STATEMENTS_PLACEHOLDER = "#";
@ -59,6 +61,8 @@ public class SqlServerConnection extends JdbcConnection {
SQLServerDriver.class.getName(),
SqlServerConnection.class.getClassLoader());
private volatile String realDatabaseName;
private static interface ResultSetExtractor<T> {
T apply(ResultSet rs) throws SQLException;
}
@ -244,7 +248,7 @@ public Set<ChangeTable> listOfChangeTables() throws SQLException {
while (rs.next()) {
changeTables.add(
new ChangeTable(
new TableId(database(), rs.getString(1), rs.getString(2)),
new TableId(getRealDatabaseName(), rs.getString(1), rs.getString(2)),
rs.getString(3),
rs.getInt(4),
Lsn.valueOf(rs.getBytes(6)),
@ -284,7 +288,7 @@ public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLExceptio
List<Column> columns = new ArrayList<>();
try (ResultSet rs = metadata.getColumns(
database(),
getRealDatabaseName(),
changeTable.getSourceTableId().schema(),
changeTable.getSourceTableId().table(),
null)
@ -308,7 +312,7 @@ public Table getTableSchemaFromChangeTable(ChangeTable changeTable) throws SQLEx
final TableId changeTableId = changeTable.getChangeTableId();
List<ColumnEditor> columnEditors = new ArrayList<>();
try (ResultSet rs = metadata.getColumns(database(), changeTableId.schema(), changeTableId.table(), null)) {
try (ResultSet rs = metadata.getColumns(getRealDatabaseName(), changeTableId.schema(), changeTableId.table(), null)) {
while (rs.next()) {
readTableColumn(rs, changeTableId, null).ifPresent(columnEditors::add);
}
@ -342,4 +346,11 @@ public synchronized void rollback() throws SQLException {
public String getNameOfChangeTable(String captureName) {
return captureName + "_CT";
}
public String getRealDatabaseName() throws SQLException {
if (realDatabaseName == null) {
realDatabaseName = queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name"));
}
return realDatabaseName;
}
}

View File

@ -74,7 +74,7 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
@Override
protected SnapshotContext prepare(ChangeEventSourceContext context) throws Exception {
return new SqlServerSnapshotContext(connectorConfig.getDatabaseName());
return new SqlServerSnapshotContext(jdbcConnection.getRealDatabaseName());
}
@Override

View File

@ -0,0 +1,148 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig.SnapshotMode;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
/**
* Integration test to verify behaviour of detabase with and without case sensistive names.
*
* @author Jiri Pechanec
*/
public class CaseSensitivenessIT extends AbstractConnectorTest {
private SqlServerConnection connection;
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Test
public void caseInsensitiveDatabase() throws Exception {
connection.execute(
"CREATE TABLE MyTableOne (Id int primary key, ColA varchar(30))",
"INSERT INTO MyTableOne VALUES(1, 'a')"
);
TestHelper.enableTableCdc(connection, "MyTableOne");
testDatabase();
}
@Test
public void caseSensitiveDatabase() throws Exception {
connection.execute(
"ALTER DATABASE testDB COLLATE Latin1_General_BIN",
"CREATE TABLE MyTableOne (Id int primary key, ColA varchar(30))",
"INSERT INTO MyTableOne VALUES(1, 'a')"
);
TestHelper.enableTableCdc(connection, "MyTableOne");
testDatabase();
}
private void testDatabase() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.MyTableOne")).hasSize(1);
SourceRecord record = records.recordsForTopic("server1.dbo.MyTableOne").get(0);
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.MyTableOne.Value")
.field("Id", Schema.INT32_SCHEMA)
.field("ColA", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
assertSchemaMatchesStruct(
(Struct)record.key(),
SchemaBuilder.struct()
.name("server1.testDB.dbo.MyTableOne.Key")
.field("Id", Schema.INT32_SCHEMA)
.build()
);
Assertions.assertThat(((Struct)((Struct)record.value()).get("after")).getInt32("Id")).isEqualTo(1);
connection.execute("INSERT INTO MyTableOne VALUES(2, 'b')");
records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.MyTableOne")).hasSize(1);
record = records.recordsForTopic("server1.dbo.MyTableOne").get(0);
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.MyTableOne.Value")
.field("Id", Schema.INT32_SCHEMA)
.field("ColA", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
assertSchemaMatchesStruct(
(Struct)record.key(),
SchemaBuilder.struct()
.name("server1.testDB.dbo.MyTableOne.Key")
.field("Id", Schema.INT32_SCHEMA)
.build()
);
Assertions.assertThat(((Struct)((Struct)record.value()).get("after")).getInt32("Id")).isEqualTo(2);
connection.execute(
"CREATE TABLE MyTableTwo (Id int primary key, ColB varchar(30))"
);
TestHelper.enableTableCdc(connection, "MyTableTwo");
connection.execute("INSERT INTO MyTableTwo VALUES(3, 'b')");
records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("server1.dbo.MyTableTwo")).hasSize(1);
record = records.recordsForTopic("server1.dbo.MyTableTwo").get(0);
assertSchemaMatchesStruct(
(Struct)((Struct)record.value()).get("after"),
SchemaBuilder.struct()
.optional()
.name("server1.testDB.dbo.MyTableTwo.Value")
.field("Id", Schema.INT32_SCHEMA)
.field("ColB", Schema.OPTIONAL_STRING_SCHEMA)
.build()
);
assertSchemaMatchesStruct(
(Struct)record.key(),
SchemaBuilder.struct()
.name("server1.testDB.dbo.MyTableTwo.Key")
.field("Id", Schema.INT32_SCHEMA)
.build()
);
Assertions.assertThat(((Struct)((Struct)record.value()).get("after")).getInt32("Id")).isEqualTo(3);
}
}

View File

@ -24,7 +24,7 @@
public class TestHelper {
public static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
public static final String TEST_DATABASE = "testDB";
public static final String TEST_DATABASE = "testdb";
private static final String STATEMENTS_PLACEHOLDER = "#";